blob: a60717a442d965104b260a3a9b988bed042c5b9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.tang.formats;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.reef.tang.ClassHierarchy;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.ConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.ClassHierarchyException;
import org.apache.reef.tang.formats.avro.AvroConfiguration;
import org.apache.reef.tang.formats.avro.ConfigurationEntry;
import org.apache.reef.tang.implementation.ConfigurationBuilderImpl;
import org.apache.reef.tang.types.ClassNode;
import org.apache.reef.tang.types.NamedParameterNode;
import org.apache.reef.tang.types.Node;
import org.apache.reef.tang.util.ReflectionUtilities;
import javax.inject.Inject;
import java.io.*;
import java.util.*;
/**
* (De-)Serializing Configuration to and from AvroConfiguration.
* <p>
* This class is stateless and is therefore safe to reuse.
*/
public final class AvroConfigurationSerializer implements ConfigurationSerializer {
/**
* The Charset used for the JSON encoding.
* <p>
* Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code>
*/
private static final String JSON_CHARSET = "ISO-8859-1";
public static final String JAVA = "Java";
public static final String CS = "Cs";
@Inject
public AvroConfigurationSerializer() {
}
private static void fromAvro(final AvroConfiguration avroConfiguration,
final ConfigurationBuilder configurationBuilder) throws BindException {
// TODO[JIRA REEF-402]: This method should implement list deserialization. Implement it when C# side is ready.
final Map<String, String> importedNames = new HashMap<>();
for (final ConfigurationEntry entry : avroConfiguration.getBindings()) {
final String longName = importedNames.get(entry.getKey().toString());
final String key;
if (null == longName) {
key = entry.getKey().toString();
} else {
key = longName;
}
// entry.getValue()'s type can be either string or array of string
final Object rawValue = entry.getValue();
try {
// TODO[JIRA REEF-402]: Implement list deserialization
// rawValue is String.
final String value = rawValue.toString();
if (key.equals(ConfigurationBuilderImpl.IMPORT)) {
configurationBuilder.getClassHierarchy().getNode(value);
final String[] tok = value.split(ReflectionUtilities.REGEXP);
final String lastTok = tok[tok.length - 1];
try {
configurationBuilder.getClassHierarchy().getNode(lastTok);
throw new IllegalArgumentException("Conflict on short name: " + lastTok);
} catch (final BindException e) {
final String oldValue = importedNames.put(lastTok, value);
if (oldValue != null) {
throw new IllegalArgumentException("Name conflict: "
+ lastTok + " maps to " + oldValue + " and " + value, e);
}
}
} else if (value.startsWith(ConfigurationBuilderImpl.INIT)) {
final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length())
.replaceAll("^[\\s\\(]+", "")
.replaceAll("[\\s\\)]+$", "")
.split("[\\s\\-]+");
configurationBuilder.registerLegacyConstructor(key, classes);
} else {
configurationBuilder.bind(key, value);
}
} catch (final BindException | ClassHierarchyException e) {
throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e);
}
}
}
private static AvroConfiguration avroFromFile(final File file) throws IOException {
final AvroConfiguration avroConfiguration;
try (DataFileReader<AvroConfiguration> dataFileReader =
new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) {
avroConfiguration = dataFileReader.next();
}
return avroConfiguration;
}
private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException {
final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null);
final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class);
return reader.read(null, decoder);
}
private static AvroConfiguration avroFromString(final String theString) throws IOException {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString);
final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class);
return reader.read(null, decoder);
}
public AvroConfiguration toAvro(final Configuration configuration) {
// TODO[JIRA REEF-402]: This method should implement list serialization. Implement it when C# side is ready.
final List<ConfigurationEntry> configurationEntries = new ArrayList<>();
for (final ClassNode<?> opt : configuration.getBoundImplementations()) {
configurationEntries.add(ConfigurationEntry.newBuilder()
.setKey(opt.getFullName())
.setValue(configuration.getBoundImplementation(opt).getFullName())
.build());
}
for (final ClassNode<?> opt : configuration.getBoundConstructors()) {
configurationEntries.add(ConfigurationEntry.newBuilder()
.setKey(opt.getFullName())
.setValue(configuration.getBoundConstructor(opt).getFullName())
.build());
}
for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) {
configurationEntries.add(ConfigurationEntry.newBuilder()
.setKey(opt.getFullName())
.setValue(configuration.getNamedParameter(opt))
.build());
}
for (final ClassNode<?> cn : configuration.getLegacyConstructors()) {
final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-");
configurationEntries.add(ConfigurationEntry.newBuilder()
.setKey(cn.getFullName())
.setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")")
.build());
}
for (final NamedParameterNode<Set<?>> key : configuration.getBoundSets()) {
for (final Object value : configuration.getBoundSet(key)) {
final String val;
if (value instanceof String) {
val = (String) value;
} else if (value instanceof Node) {
val = ((Node) value).getFullName();
} else {
throw new IllegalStateException("The value bound to a given NamedParameterNode "
+ key + " is neither the set of class hierarchy nodes nor strings.");
}
configurationEntries.add(ConfigurationEntry.newBuilder()
.setKey(key.getFullName())
.setValue(val)
.build());
}
}
// TODO[JIRA REEF-402]: Implement list serialization
if (configuration.getBoundLists() != null && !configuration.getBoundLists().isEmpty()) {
throw new NotImplementedException("List serialization/deserialization is not supported");
}
return AvroConfiguration.newBuilder().setLanguage(JAVA).setBindings(configurationEntries).build();
}
@Override
public void toFile(final Configuration conf, final File file) throws IOException {
final AvroConfiguration avroConfiguration = toAvro(conf);
final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) {
dataFileWriter.create(avroConfiguration.getSchema(), file);
dataFileWriter.append(avroConfiguration);
}
}
@Override
public void toTextFile(final Configuration conf, final File file) throws IOException {
try (Writer w = new OutputStreamWriter(new FileOutputStream(file), JSON_CHARSET)) {
w.write(this.toString(conf));
}
}
@Override
public byte[] toByteArray(final Configuration conf) throws IOException {
final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
final byte[] theBytes;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
configurationWriter.write(toAvro(conf), encoder);
encoder.flush();
out.flush();
theBytes = out.toByteArray();
}
return theBytes;
}
/**
* Produce a JSON string that represents given configuration.
* @param configuration Tang configuration to convert into a JSON string.
* @return A JSON string that corresponds to the given Tang configuration.
*/
@Override
public String toString(final Configuration configuration) {
return toString(configuration, false);
}
/**
* Produce a JSON string that represents given configuration.
* @param configuration Tang configuration to convert into a JSON string.
* @param prettyPrint If true, use new lines and spaces to pretty print the JSON string.
* If false (by default), output JSON as a single line.
* @return A JSON string that corresponds to the given Tang configuration.
*/
public String toString(final Configuration configuration, final boolean prettyPrint) {
final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
final String result;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
// TODO [REEF-1536] Re-enable pretty printing when Avro 1.7.5 available on all environments:
final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); //, prettyPrint);
configurationWriter.write(toAvro(configuration), encoder);
encoder.flush();
out.flush();
result = out.toString(JSON_CHARSET);
} catch (final IOException e) {
throw new RuntimeException(e);
}
return result;
}
/**
* Converts a given AvroConfiguration to Configuration.
*
* @param avroConfiguration a Avro configuration
* @return a Configuration version of the given AvroConfiguration
*/
public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException {
final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
fromAvro(avroConfiguration, configurationBuilder);
return configurationBuilder.build();
}
/**
* Converts a given AvroConfiguration to Configuration.
*
* @param avroConfiguration a Avro configuration
* @param classHierarchy the class hierarchy used for validation.
* @return a Configuration version of the given AvroConfiguration
*/
public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy)
throws BindException {
final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy);
fromAvro(avroConfiguration, configurationBuilder);
return configurationBuilder.build();
}
@Override
public Configuration fromFile(final File file) throws IOException, BindException {
return fromAvro(avroFromFile(file));
}
@Override
public Configuration fromFile(final File file, final ClassHierarchy classHierarchy)
throws IOException, BindException {
return fromAvro(avroFromFile(file), classHierarchy);
}
@Override
public Configuration fromTextFile(final File file) throws IOException, BindException {
final StringBuilder result = readFromTextFile(file);
return this.fromString(result.toString());
}
@Override
public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy)
throws IOException, BindException {
final StringBuilder result = readFromTextFile(file);
return this.fromString(result.toString(), classHierarchy);
}
private StringBuilder readFromTextFile(final File file) throws IOException {
final StringBuilder result = new StringBuilder();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream(file), JSON_CHARSET))) {
String line = reader.readLine();
while (line != null) {
result.append(line);
line = reader.readLine();
}
}
return result;
}
@Override
public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException {
return fromAvro(avroFromBytes(theBytes));
}
@Override
public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy)
throws IOException, BindException {
return fromAvro(avroFromBytes(theBytes), classHierarchy);
}
@Override
public Configuration fromString(final String theString) throws IOException, BindException {
return fromAvro(avroFromString(theString));
}
@Override
public Configuration fromString(final String theString, final ClassHierarchy classHierarchy)
throws IOException, BindException {
return fromAvro(avroFromString(theString), classHierarchy);
}
/**
* Converts a given serialized string to ConfigurationBuilder from which Configuration can be produced.
*
* @param theString the String containing configuration
* @param configBuilder a configuration builder
* @throws IOException if the string is not Avro format
* @throws BindException if the content of configuration string is invalid to bind
*/
public void configurationBuilderFromString(final String theString, final ConfigurationBuilder configBuilder)
throws IOException, BindException {
fromAvro(avroFromString(theString), configBuilder);
}
}