This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tang.formats;
020
021import org.apache.avro.file.DataFileReader;
022import org.apache.avro.file.DataFileWriter;
023import org.apache.avro.io.*;
024import org.apache.avro.specific.SpecificDatumReader;
025import org.apache.avro.specific.SpecificDatumWriter;
026import org.apache.commons.lang.NotImplementedException;
027import org.apache.commons.lang.StringUtils;
028import org.apache.reef.tang.ClassHierarchy;
029import org.apache.reef.tang.Configuration;
030import org.apache.reef.tang.ConfigurationBuilder;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.exceptions.BindException;
033import org.apache.reef.tang.exceptions.ClassHierarchyException;
034import org.apache.reef.tang.formats.avro.AvroConfiguration;
035import org.apache.reef.tang.formats.avro.ConfigurationEntry;
036import org.apache.reef.tang.implementation.ConfigurationBuilderImpl;
037import org.apache.reef.tang.types.ClassNode;
038import org.apache.reef.tang.types.NamedParameterNode;
039import org.apache.reef.tang.types.Node;
040import org.apache.reef.tang.util.ReflectionUtilities;
041
042import javax.inject.Inject;
043
044import java.io.*;
045import java.util.*;
046
047/**
048 * (De-)Serializing Configuration to and from AvroConfiguration.
049 * <p>
050 * This class is stateless and is therefore safe to reuse.
051 */
052public final class AvroConfigurationSerializer implements ConfigurationSerializer {
053
054  /**
055   * The Charset used for the JSON encoding.
056   * <p>
057   * Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code>
058   */
059  private static final String JSON_CHARSET = "ISO-8859-1";
060  public static final String JAVA = "Java";
061  public static final String CS = "Cs";
062
063  @Inject
064  public AvroConfigurationSerializer() {
065  }
066
067  private static void fromAvro(final AvroConfiguration avroConfiguration,
068                               final ConfigurationBuilder configurationBuilder) throws BindException {
069    // TODO[JIRA REEF-402]: This method should implement list deserialization. Implement it when C# side is ready.
070    final Map<String, String> importedNames = new HashMap<>();
071
072    for (final ConfigurationEntry entry : avroConfiguration.getBindings()) {
073
074      final String longName = importedNames.get(entry.getKey().toString());
075      final String key;
076      if (null == longName) {
077        key = entry.getKey().toString();
078      } else {
079        key = longName;
080      }
081
082      // entry.getValue()'s type can be either string or array of string
083      final Object rawValue = entry.getValue();
084
085      try {
086        // TODO[JIRA REEF-402]: Implement list deserialization
087        // rawValue is String.
088        final String value = rawValue.toString();
089        if (key.equals(ConfigurationBuilderImpl.IMPORT)) {
090          configurationBuilder.getClassHierarchy().getNode(value);
091          final String[] tok = value.split(ReflectionUtilities.REGEXP);
092          final String lastTok = tok[tok.length - 1];
093          try {
094            configurationBuilder.getClassHierarchy().getNode(lastTok);
095            throw new IllegalArgumentException("Conflict on short name: " + lastTok);
096          } catch (final BindException e) {
097            final String oldValue = importedNames.put(lastTok, value);
098            if (oldValue != null) {
099              throw new IllegalArgumentException("Name conflict: "
100                  + lastTok + " maps to " + oldValue + " and " + value, e);
101            }
102          }
103        } else if (value.startsWith(ConfigurationBuilderImpl.INIT)) {
104          final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length())
105              .replaceAll("^[\\s\\(]+", "")
106              .replaceAll("[\\s\\)]+$", "")
107              .split("[\\s\\-]+");
108          configurationBuilder.registerLegacyConstructor(key, classes);
109        } else {
110          configurationBuilder.bind(key, value);
111        }
112      } catch (final BindException | ClassHierarchyException e) {
113        throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e);
114      }
115    }
116  }
117
118  private static AvroConfiguration avroFromFile(final File file) throws IOException {
119    final AvroConfiguration avroConfiguration;
120    try (final DataFileReader<AvroConfiguration> dataFileReader =
121             new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) {
122      avroConfiguration = dataFileReader.next();
123    }
124    return avroConfiguration;
125  }
126
127  private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException {
128    final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null);
129    final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class);
130    return reader.read(null, decoder);
131  }
132
133  private static AvroConfiguration avroFromString(final String theString) throws IOException {
134    final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString);
135    final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class);
136    return reader.read(null, decoder);
137  }
138
139  public AvroConfiguration toAvro(final Configuration configuration) {
140    // TODO[JIRA REEF-402]: This method should implement list serialization. Implement it when C# side is ready.
141
142    final List<ConfigurationEntry> configurationEntries = new ArrayList<>();
143
144    for (final ClassNode<?> opt : configuration.getBoundImplementations()) {
145      configurationEntries.add(ConfigurationEntry.newBuilder()
146          .setKey(opt.getFullName())
147          .setValue(configuration.getBoundImplementation(opt).getFullName())
148          .build());
149    }
150
151    for (final ClassNode<?> opt : configuration.getBoundConstructors()) {
152      configurationEntries.add(ConfigurationEntry.newBuilder()
153          .setKey(opt.getFullName())
154          .setValue(configuration.getBoundConstructor(opt).getFullName())
155          .build());
156    }
157    for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) {
158      configurationEntries.add(ConfigurationEntry.newBuilder()
159          .setKey(opt.getFullName())
160          .setValue(configuration.getNamedParameter(opt))
161          .build());
162    }
163    for (final ClassNode<?> cn : configuration.getLegacyConstructors()) {
164      final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-");
165      configurationEntries.add(ConfigurationEntry.newBuilder()
166          .setKey(cn.getFullName())
167          .setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")")
168          .build());
169    }
170    for (final NamedParameterNode<Set<?>> key : configuration.getBoundSets()) {
171      for (final Object value : configuration.getBoundSet(key)) {
172        final String val;
173        if (value instanceof String) {
174          val = (String) value;
175        } else if (value instanceof Node) {
176          val = ((Node) value).getFullName();
177        } else {
178          throw new IllegalStateException("The value bound to a given NamedParameterNode "
179                  + key + " is neither the set of class hierarchy nodes nor strings.");
180        }
181        configurationEntries.add(ConfigurationEntry.newBuilder()
182            .setKey(key.getFullName())
183            .setValue(val)
184            .build());
185      }
186    }
187    // TODO[JIRA REEF-402]: Implement list serialization
188    if (configuration.getBoundLists() != null && !configuration.getBoundLists().isEmpty()) {
189      throw new NotImplementedException("List serialization/deserialization is not supported");
190    }
191
192    return AvroConfiguration.newBuilder().setLanguage(JAVA).setBindings(configurationEntries).build();
193  }
194
195  @Override
196  public void toFile(final Configuration conf, final File file) throws IOException {
197    final AvroConfiguration avroConfiguration = toAvro(conf);
198    final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
199    try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) {
200      dataFileWriter.create(avroConfiguration.getSchema(), file);
201      dataFileWriter.append(avroConfiguration);
202    }
203  }
204
205  @Override
206  public void toTextFile(final Configuration conf, final File file) throws IOException {
207    try (final Writer w = new OutputStreamWriter(new FileOutputStream(file), JSON_CHARSET)) {
208      w.write(this.toString(conf));
209    }
210  }
211
212  @Override
213  public byte[] toByteArray(final Configuration conf) throws IOException {
214    final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
215    final byte[] theBytes;
216    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
217      final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
218      configurationWriter.write(toAvro(conf), encoder);
219      encoder.flush();
220      out.flush();
221      theBytes = out.toByteArray();
222    }
223    return theBytes;
224  }
225
226  /**
227   * Produce a JSON string that represents given configuration.
228   * @param configuration Tang configuration to convert into a JSON string.
229   * @return A JSON string that corresponds to the given Tang configuration.
230   */
231  @Override
232  public String toString(final Configuration configuration) {
233    return toString(configuration, false);
234  }
235
236  /**
237   * Produce a JSON string that represents given configuration.
238   * @param configuration Tang configuration to convert into a JSON string.
239   * @param prettyPrint If true, use new lines and spaces to pretty print the JSON string.
240   * If false (by default), output JSON as a single line.
241   * @return A JSON string that corresponds to the given Tang configuration.
242   */
243  public String toString(final Configuration configuration, final boolean prettyPrint) {
244    final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
245    final String result;
246    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
247      // TODO [REEF-1536] Re-enable pretty printing when Avro 1.7.5 available on all environments:
248      final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); //, prettyPrint);
249      configurationWriter.write(toAvro(configuration), encoder);
250      encoder.flush();
251      out.flush();
252      result = out.toString(JSON_CHARSET);
253    } catch (final IOException e) {
254      throw new RuntimeException(e);
255    }
256    return result;
257  }
258
259  /**
260   * Converts a given AvroConfiguration to Configuration.
261   *
262   * @param avroConfiguration a Avro configuration
263   * @return a Configuration version of the given AvroConfiguration
264   */
265  public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException {
266    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
267    fromAvro(avroConfiguration, configurationBuilder);
268    return configurationBuilder.build();
269  }
270
271  /**
272   * Converts a given AvroConfiguration to Configuration.
273   *
274   * @param avroConfiguration a Avro configuration
275   * @param classHierarchy    the class hierarchy used for validation.
276   * @return a Configuration version of the given AvroConfiguration
277   */
278  public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy)
279      throws BindException {
280    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy);
281    fromAvro(avroConfiguration, configurationBuilder);
282    return configurationBuilder.build();
283  }
284
285  @Override
286  public Configuration fromFile(final File file) throws IOException, BindException {
287    return fromAvro(avroFromFile(file));
288  }
289
290  @Override
291  public Configuration fromFile(final File file, final ClassHierarchy classHierarchy)
292      throws IOException, BindException {
293    return fromAvro(avroFromFile(file), classHierarchy);
294  }
295
296  @Override
297  public Configuration fromTextFile(final File file) throws IOException, BindException {
298    final StringBuilder result = readFromTextFile(file);
299    return this.fromString(result.toString());
300  }
301
302  @Override
303  public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy)
304      throws IOException, BindException {
305    final StringBuilder result = readFromTextFile(file);
306    return this.fromString(result.toString(), classHierarchy);
307  }
308
309  private StringBuilder readFromTextFile(final File file) throws IOException {
310    final StringBuilder result = new StringBuilder();
311    try (final BufferedReader reader =
312             new BufferedReader(new InputStreamReader(new FileInputStream(file), JSON_CHARSET))) {
313      String line = reader.readLine();
314      while (line != null) {
315        result.append(line);
316        line = reader.readLine();
317      }
318    }
319    return result;
320  }
321
322  @Override
323  public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException {
324    return fromAvro(avroFromBytes(theBytes));
325  }
326
327  @Override
328  public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy)
329      throws IOException, BindException {
330    return fromAvro(avroFromBytes(theBytes), classHierarchy);
331  }
332
333  @Override
334  public Configuration fromString(final String theString) throws IOException, BindException {
335    return fromAvro(avroFromString(theString));
336  }
337
338  @Override
339  public Configuration fromString(final String theString, final ClassHierarchy classHierarchy)
340      throws IOException, BindException {
341    return fromAvro(avroFromString(theString), classHierarchy);
342  }
343
344  /**
345   * Converts a given serialized string to ConfigurationBuilder from which Configuration can be produced.
346   *
347   * @param theString the String containing configuration
348   * @param configBuilder a configuration builder
349   * @throws IOException if the string is not Avro format
350   * @throws BindException if the content of configuration string is invalid to bind
351   */
352  public void configurationBuilderFromString(final String theString, final ConfigurationBuilder configBuilder)
353      throws IOException, BindException {
354    fromAvro(avroFromString(theString), configBuilder);
355  }
356}