001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tang.formats; 020 021import org.apache.avro.file.DataFileReader; 022import org.apache.avro.file.DataFileWriter; 023import org.apache.avro.io.*; 024import org.apache.avro.specific.SpecificDatumReader; 025import org.apache.avro.specific.SpecificDatumWriter; 026import org.apache.commons.lang.StringUtils; 027import org.apache.reef.tang.ClassHierarchy; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.ConfigurationBuilder; 030import org.apache.reef.tang.Tang; 031import org.apache.reef.tang.exceptions.BindException; 032import org.apache.reef.tang.exceptions.ClassHierarchyException; 033import org.apache.reef.tang.formats.avro.AvroConfiguration; 034import org.apache.reef.tang.formats.avro.ConfigurationEntry; 035import org.apache.reef.tang.implementation.ConfigurationBuilderImpl; 036import org.apache.reef.tang.types.ClassNode; 037import org.apache.reef.tang.types.NamedParameterNode; 038import org.apache.reef.tang.types.Node; 039import org.apache.reef.tang.util.ReflectionUtilities; 040 041import javax.inject.Inject; 042import java.io.*; 043import java.util.*; 044 045/** 046 * (De-)Serializing Configuration to and from AvroConfiguration. 047 * <p/> 048 * This class is stateless and is therefore safe to reuse. 049 */ 050public final class AvroConfigurationSerializer implements ConfigurationSerializer { 051 052 /** 053 * The Charset used for the JSON encoding. 054 * <p/> 055 * Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code> 056 */ 057 private static final String JSON_CHARSET = "ISO-8859-1"; 058 059 @Inject 060 public AvroConfigurationSerializer() { 061 } 062 063 private static void fromAvro(final AvroConfiguration avroConfiguration, final ConfigurationBuilder configurationBuilder) throws BindException { 064 // Note: This code is an adapted version of ConfigurationFile.processConfigFile(); 065 // TODO: This method should implement list deserialization. Implement it when C# side is ready. 066 final Map<String, String> importedNames = new HashMap<>(); 067 068 for (final ConfigurationEntry entry : avroConfiguration.getBindings()) { 069 070 final String longName = importedNames.get(entry.getKey().toString()); 071 final String key; 072 if (null == longName) { 073 key = entry.getKey().toString(); 074 } else { 075 key = longName; 076 } 077 078 // entry.getValue()'s type can be either string or array of string 079 final Object rawValue = entry.getValue(); 080 081 try { 082 // TODO: Implement list deserialization 083 // rawValue is String. 084 String value = rawValue.toString(); 085 if (key.equals(ConfigurationBuilderImpl.IMPORT)) { 086 configurationBuilder.getClassHierarchy().getNode(value); 087 final String[] tok = value.split(ReflectionUtilities.regexp); 088 final String lastTok = tok[tok.length - 1]; 089 try { 090 configurationBuilder.getClassHierarchy().getNode(lastTok); 091 throw new IllegalArgumentException("Conflict on short name: " + lastTok); 092 } catch (final BindException e) { 093 final String oldValue = importedNames.put(lastTok, value); 094 if (oldValue != null) { 095 throw new IllegalArgumentException("Name conflict: " 096 + lastTok + " maps to " + oldValue + " and " + value); 097 } 098 } 099 } else if (value.startsWith(ConfigurationBuilderImpl.INIT)) { 100 final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length()) 101 .replaceAll("^[\\s\\(]+", "") 102 .replaceAll("[\\s\\)]+$", "") 103 .split("[\\s\\-]+"); 104 configurationBuilder.registerLegacyConstructor(key, classes); 105 } else { 106 configurationBuilder.bind(key, value); 107 } 108 } catch (final BindException | ClassHierarchyException e) { 109 throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e); 110 } 111 } 112 } 113 114 private static AvroConfiguration avroFromFile(final File file) throws IOException { 115 final AvroConfiguration avroConfiguration; 116 try (final DataFileReader<AvroConfiguration> dataFileReader = 117 new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) { 118 avroConfiguration = dataFileReader.next(); 119 } 120 return avroConfiguration; 121 } 122 123 private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException { 124 final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null); 125 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 126 return reader.read(null, decoder); 127 } 128 129 private static AvroConfiguration avroFromString(final String theString) throws IOException { 130 final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString); 131 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 132 return reader.read(null, decoder); 133 } 134 135 public AvroConfiguration toAvro(final Configuration configuration) { 136 // Note: This code is an adapted version of ConfiurationFile.toConfigurationStringList(); 137 // TODO: This method should implement list serialization. Implement it when C# side is ready. 138 139 final List<ConfigurationEntry> configurationEntries = new ArrayList<>(); 140 141 for (final ClassNode<?> opt : configuration.getBoundImplementations()) { 142 configurationEntries.add(new ConfigurationEntry().newBuilder() 143 .setKey(opt.getFullName()) 144 .setValue(configuration.getBoundImplementation(opt).getFullName()) 145 .build()); 146 } 147 148 for (final ClassNode<?> opt : configuration.getBoundConstructors()) { 149 configurationEntries.add(new ConfigurationEntry().newBuilder() 150 .setKey(opt.getFullName()) 151 .setValue(configuration.getBoundConstructor(opt).getFullName()) 152 .build()); 153 } 154 for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) { 155 configurationEntries.add(new ConfigurationEntry().newBuilder() 156 .setKey(opt.getFullName()) 157 .setValue(configuration.getNamedParameter(opt)) 158 .build()); 159 } 160 for (final ClassNode<?> cn : configuration.getLegacyConstructors()) { 161 final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-"); 162 configurationEntries.add(new ConfigurationEntry().newBuilder() 163 .setKey(cn.getFullName()) 164 .setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")") 165 .build()); 166 } 167 for (final Map.Entry<NamedParameterNode<Set<?>>, Object> e : configuration.getBoundSets()) { 168 final String val; 169 if (e.getValue() instanceof String) { 170 val = (String) e.getValue(); 171 } else if (e.getValue() instanceof Node) { 172 val = ((Node) e.getValue()).getFullName(); 173 } else { 174 throw new IllegalStateException(); 175 } 176 configurationEntries.add(new ConfigurationEntry().newBuilder() 177 .setKey(e.getKey().getFullName()) 178 .setValue(val) 179 .build()); 180 } 181 // TODO: Implement list serialization 182 183 return AvroConfiguration.newBuilder().setBindings(configurationEntries).build(); 184 } 185 186 @Override 187 public void toFile(final Configuration conf, final File file) throws IOException { 188 final AvroConfiguration avroConfiguration = toAvro(conf); 189 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 190 try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) { 191 dataFileWriter.create(avroConfiguration.getSchema(), file); 192 dataFileWriter.append(avroConfiguration); 193 } 194 } 195 196 @Override 197 public void toTextFile(final Configuration conf, final File file) throws IOException { 198 try (final Writer w = new FileWriter(file)) { 199 w.write(this.toString(conf)); 200 } 201 } 202 203 @Override 204 public byte[] toByteArray(final Configuration conf) throws IOException { 205 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 206 final byte[] theBytes; 207 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 208 final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 209 configurationWriter.write(toAvro(conf), encoder); 210 encoder.flush(); 211 out.flush(); 212 theBytes = out.toByteArray(); 213 } 214 return theBytes; 215 } 216 217 @Override 218 public String toString(final Configuration configuration) { 219 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 220 final String result; 221 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 222 final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); 223 configurationWriter.write(toAvro(configuration), encoder); 224 encoder.flush(); 225 out.flush(); 226 result = out.toString(JSON_CHARSET); 227 } catch (final IOException e) { 228 throw new RuntimeException(e); 229 } 230 return result; 231 } 232 233 /** 234 * Converts a given AvroConfiguration to Configuration 235 * 236 * @param avroConfiguration 237 * @return a Configuration version of the given AvroConfiguration 238 */ 239 public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException { 240 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 241 fromAvro(avroConfiguration, configurationBuilder); 242 return configurationBuilder.build(); 243 } 244 245 /** 246 * Converts a given AvroConfiguration to Configuration 247 * 248 * @param avroConfiguration 249 * @param classHierarchy the class hierarchy used for validation. 250 * @return a Configuration version of the given AvroConfiguration 251 */ 252 public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy) 253 throws BindException { 254 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy); 255 fromAvro(avroConfiguration, configurationBuilder); 256 return configurationBuilder.build(); 257 } 258 259 @Override 260 public Configuration fromFile(final File file) throws IOException, BindException { 261 return fromAvro(avroFromFile(file)); 262 } 263 264 @Override 265 public Configuration fromFile(final File file, final ClassHierarchy classHierarchy) throws IOException, BindException { 266 return fromAvro(avroFromFile(file), classHierarchy); 267 } 268 269 @Override 270 public Configuration fromTextFile(final File file) throws IOException, BindException { 271 final StringBuilder result = readFromTextFile(file); 272 return this.fromString(result.toString()); 273 } 274 275 @Override 276 public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy) throws IOException, BindException { 277 final StringBuilder result = readFromTextFile(file); 278 return this.fromString(result.toString(), classHierarchy); 279 } 280 281 private StringBuilder readFromTextFile(final File file) throws IOException { 282 final StringBuilder result = new StringBuilder(); 283 try (final BufferedReader reader = new BufferedReader(new FileReader(file))) { 284 String line = reader.readLine(); 285 while (line != null) { 286 result.append(line); 287 line = reader.readLine(); 288 } 289 } 290 return result; 291 } 292 293 @Override 294 public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException { 295 return fromAvro(avroFromBytes(theBytes)); 296 } 297 298 @Override 299 public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy) throws IOException, BindException { 300 return fromAvro(avroFromBytes(theBytes), classHierarchy); 301 } 302 303 @Override 304 public Configuration fromString(final String theString) throws IOException, BindException { 305 return fromAvro(avroFromString(theString)); 306 } 307 308 @Override 309 public Configuration fromString(final String theString, final ClassHierarchy classHierarchy) throws IOException, BindException { 310 return fromAvro(avroFromString(theString), classHierarchy); 311 } 312 313}