This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tang.formats;
020
021import org.apache.avro.file.DataFileReader;
022import org.apache.avro.file.DataFileWriter;
023import org.apache.avro.io.*;
024import org.apache.avro.specific.SpecificDatumReader;
025import org.apache.avro.specific.SpecificDatumWriter;
026import org.apache.commons.lang.StringUtils;
027import org.apache.reef.tang.ClassHierarchy;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.ConfigurationBuilder;
030import org.apache.reef.tang.Tang;
031import org.apache.reef.tang.exceptions.BindException;
032import org.apache.reef.tang.exceptions.ClassHierarchyException;
033import org.apache.reef.tang.formats.avro.AvroConfiguration;
034import org.apache.reef.tang.formats.avro.ConfigurationEntry;
035import org.apache.reef.tang.implementation.ConfigurationBuilderImpl;
036import org.apache.reef.tang.types.ClassNode;
037import org.apache.reef.tang.types.NamedParameterNode;
038import org.apache.reef.tang.types.Node;
039import org.apache.reef.tang.util.ReflectionUtilities;
040
041import javax.inject.Inject;
042import java.io.*;
043import java.util.*;
044
045/**
046 * (De-)Serializing Configuration to and from AvroConfiguration.
047 * <p/>
048 * This class is stateless and is therefore safe to reuse.
049 */
050public final class AvroConfigurationSerializer implements ConfigurationSerializer {
051
052  /**
053   * The Charset used for the JSON encoding.
054   * <p/>
055   * Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code>
056   */
057  private static final String JSON_CHARSET = "ISO-8859-1";
058
059  @Inject
060  public AvroConfigurationSerializer() {
061  }
062
063  private static void fromAvro(final AvroConfiguration avroConfiguration, final ConfigurationBuilder configurationBuilder) throws BindException {
064    // Note: This code is an adapted version of ConfigurationFile.processConfigFile();
065    // TODO: This method should implement list deserialization. Implement it when C# side is ready.
066    final Map<String, String> importedNames = new HashMap<>();
067
068    for (final ConfigurationEntry entry : avroConfiguration.getBindings()) {
069
070      final String longName = importedNames.get(entry.getKey().toString());
071      final String key;
072      if (null == longName) {
073        key = entry.getKey().toString();
074      } else {
075        key = longName;
076      }
077
078      // entry.getValue()'s type can be either string or array of string
079      final Object rawValue = entry.getValue();
080
081      try {
082        // TODO: Implement list deserialization
083        // rawValue is String.
084        String value = rawValue.toString();
085        if (key.equals(ConfigurationBuilderImpl.IMPORT)) {
086          configurationBuilder.getClassHierarchy().getNode(value);
087          final String[] tok = value.split(ReflectionUtilities.regexp);
088          final String lastTok = tok[tok.length - 1];
089          try {
090            configurationBuilder.getClassHierarchy().getNode(lastTok);
091            throw new IllegalArgumentException("Conflict on short name: " + lastTok);
092          } catch (final BindException e) {
093            final String oldValue = importedNames.put(lastTok, value);
094            if (oldValue != null) {
095              throw new IllegalArgumentException("Name conflict: "
096                  + lastTok + " maps to " + oldValue + " and " + value);
097            }
098          }
099        } else if (value.startsWith(ConfigurationBuilderImpl.INIT)) {
100          final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length())
101              .replaceAll("^[\\s\\(]+", "")
102              .replaceAll("[\\s\\)]+$", "")
103              .split("[\\s\\-]+");
104          configurationBuilder.registerLegacyConstructor(key, classes);
105        } else {
106          configurationBuilder.bind(key, value);
107        }
108      } catch (final BindException | ClassHierarchyException e) {
109        throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e);
110      }
111    }
112  }
113
114  private static AvroConfiguration avroFromFile(final File file) throws IOException {
115    final AvroConfiguration avroConfiguration;
116    try (final DataFileReader<AvroConfiguration> dataFileReader =
117             new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) {
118      avroConfiguration = dataFileReader.next();
119    }
120    return avroConfiguration;
121  }
122
123  private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException {
124    final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null);
125    final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class);
126    return reader.read(null, decoder);
127  }
128
129  private static AvroConfiguration avroFromString(final String theString) throws IOException {
130    final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString);
131    final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class);
132    return reader.read(null, decoder);
133  }
134
135  public AvroConfiguration toAvro(final Configuration configuration) {
136    // Note: This code is an adapted version of ConfiurationFile.toConfigurationStringList();
137    // TODO: This method should implement list serialization. Implement it when C# side is ready.
138
139    final List<ConfigurationEntry> configurationEntries = new ArrayList<>();
140
141    for (final ClassNode<?> opt : configuration.getBoundImplementations()) {
142      configurationEntries.add(new ConfigurationEntry().newBuilder()
143          .setKey(opt.getFullName())
144          .setValue(configuration.getBoundImplementation(opt).getFullName())
145          .build());
146    }
147
148    for (final ClassNode<?> opt : configuration.getBoundConstructors()) {
149      configurationEntries.add(new ConfigurationEntry().newBuilder()
150          .setKey(opt.getFullName())
151          .setValue(configuration.getBoundConstructor(opt).getFullName())
152          .build());
153    }
154    for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) {
155      configurationEntries.add(new ConfigurationEntry().newBuilder()
156          .setKey(opt.getFullName())
157          .setValue(configuration.getNamedParameter(opt))
158          .build());
159    }
160    for (final ClassNode<?> cn : configuration.getLegacyConstructors()) {
161      final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-");
162      configurationEntries.add(new ConfigurationEntry().newBuilder()
163          .setKey(cn.getFullName())
164          .setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")")
165          .build());
166    }
167    for (final Map.Entry<NamedParameterNode<Set<?>>, Object> e : configuration.getBoundSets()) {
168      final String val;
169      if (e.getValue() instanceof String) {
170        val = (String) e.getValue();
171      } else if (e.getValue() instanceof Node) {
172        val = ((Node) e.getValue()).getFullName();
173      } else {
174        throw new IllegalStateException();
175      }
176      configurationEntries.add(new ConfigurationEntry().newBuilder()
177          .setKey(e.getKey().getFullName())
178          .setValue(val)
179          .build());
180    }
181    // TODO: Implement list serialization
182
183    return AvroConfiguration.newBuilder().setBindings(configurationEntries).build();
184  }
185
186  @Override
187  public void toFile(final Configuration conf, final File file) throws IOException {
188    final AvroConfiguration avroConfiguration = toAvro(conf);
189    final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
190    try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) {
191      dataFileWriter.create(avroConfiguration.getSchema(), file);
192      dataFileWriter.append(avroConfiguration);
193    }
194  }
195
196  @Override
197  public void toTextFile(final Configuration conf, final File file) throws IOException {
198    try (final Writer w = new FileWriter(file)) {
199      w.write(this.toString(conf));
200    }
201  }
202
203  @Override
204  public byte[] toByteArray(final Configuration conf) throws IOException {
205    final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
206    final byte[] theBytes;
207    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
208      final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
209      configurationWriter.write(toAvro(conf), encoder);
210      encoder.flush();
211      out.flush();
212      theBytes = out.toByteArray();
213    }
214    return theBytes;
215  }
216
217  @Override
218  public String toString(final Configuration configuration) {
219    final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class);
220    final String result;
221    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
222      final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out);
223      configurationWriter.write(toAvro(configuration), encoder);
224      encoder.flush();
225      out.flush();
226      result = out.toString(JSON_CHARSET);
227    } catch (final IOException e) {
228      throw new RuntimeException(e);
229    }
230    return result;
231  }
232
233  /**
234   * Converts a given AvroConfiguration to Configuration
235   *
236   * @param avroConfiguration
237   * @return a Configuration version of the given AvroConfiguration
238   */
239  public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException {
240    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
241    fromAvro(avroConfiguration, configurationBuilder);
242    return configurationBuilder.build();
243  }
244
245  /**
246   * Converts a given AvroConfiguration to Configuration
247   *
248   * @param avroConfiguration
249   * @param classHierarchy    the class hierarchy used for validation.
250   * @return a Configuration version of the given AvroConfiguration
251   */
252  public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy)
253      throws BindException {
254    final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy);
255    fromAvro(avroConfiguration, configurationBuilder);
256    return configurationBuilder.build();
257  }
258
259  @Override
260  public Configuration fromFile(final File file) throws IOException, BindException {
261    return fromAvro(avroFromFile(file));
262  }
263
264  @Override
265  public Configuration fromFile(final File file, final ClassHierarchy classHierarchy) throws IOException, BindException {
266    return fromAvro(avroFromFile(file), classHierarchy);
267  }
268
269  @Override
270  public Configuration fromTextFile(final File file) throws IOException, BindException {
271    final StringBuilder result = readFromTextFile(file);
272    return this.fromString(result.toString());
273  }
274
275  @Override
276  public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy) throws IOException, BindException {
277    final StringBuilder result = readFromTextFile(file);
278    return this.fromString(result.toString(), classHierarchy);
279  }
280
281  private StringBuilder readFromTextFile(final File file) throws IOException {
282    final StringBuilder result = new StringBuilder();
283    try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
284      String line = reader.readLine();
285      while (line != null) {
286        result.append(line);
287        line = reader.readLine();
288      }
289    }
290    return result;
291  }
292
293  @Override
294  public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException {
295    return fromAvro(avroFromBytes(theBytes));
296  }
297
298  @Override
299  public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy) throws IOException, BindException {
300    return fromAvro(avroFromBytes(theBytes), classHierarchy);
301  }
302
303  @Override
304  public Configuration fromString(final String theString) throws IOException, BindException {
305    return fromAvro(avroFromString(theString));
306  }
307
308  @Override
309  public Configuration fromString(final String theString, final ClassHierarchy classHierarchy) throws IOException, BindException {
310    return fromAvro(avroFromString(theString), classHierarchy);
311  }
312
313}