001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tang.formats; 020 021import org.apache.avro.file.DataFileReader; 022import org.apache.avro.file.DataFileWriter; 023import org.apache.avro.io.*; 024import org.apache.avro.specific.SpecificDatumReader; 025import org.apache.avro.specific.SpecificDatumWriter; 026import org.apache.commons.lang.NotImplementedException; 027import org.apache.commons.lang.StringUtils; 028import org.apache.reef.tang.ClassHierarchy; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.ConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.exceptions.BindException; 033import org.apache.reef.tang.exceptions.ClassHierarchyException; 034import org.apache.reef.tang.formats.avro.AvroConfiguration; 035import org.apache.reef.tang.formats.avro.ConfigurationEntry; 036import org.apache.reef.tang.implementation.ConfigurationBuilderImpl; 037import org.apache.reef.tang.types.ClassNode; 038import org.apache.reef.tang.types.NamedParameterNode; 039import org.apache.reef.tang.types.Node; 040import org.apache.reef.tang.util.ReflectionUtilities; 041 042import javax.inject.Inject; 043 044import java.io.*; 045import java.util.*; 046 047/** 048 * (De-)Serializing Configuration to and from AvroConfiguration. 049 * <p> 050 * This class is stateless and is therefore safe to reuse. 051 */ 052public final class AvroConfigurationSerializer implements ConfigurationSerializer { 053 054 /** 055 * The Charset used for the JSON encoding. 056 * <p> 057 * Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code> 058 */ 059 private static final String JSON_CHARSET = "ISO-8859-1"; 060 public static final String JAVA = "Java"; 061 public static final String CS = "Cs"; 062 063 @Inject 064 public AvroConfigurationSerializer() { 065 } 066 067 private static void fromAvro(final AvroConfiguration avroConfiguration, 068 final ConfigurationBuilder configurationBuilder) throws BindException { 069 // TODO[JIRA REEF-402]: This method should implement list deserialization. Implement it when C# side is ready. 070 final Map<String, String> importedNames = new HashMap<>(); 071 072 for (final ConfigurationEntry entry : avroConfiguration.getBindings()) { 073 074 final String longName = importedNames.get(entry.getKey().toString()); 075 final String key; 076 if (null == longName) { 077 key = entry.getKey().toString(); 078 } else { 079 key = longName; 080 } 081 082 // entry.getValue()'s type can be either string or array of string 083 final Object rawValue = entry.getValue(); 084 085 try { 086 // TODO[JIRA REEF-402]: Implement list deserialization 087 // rawValue is String. 088 final String value = rawValue.toString(); 089 if (key.equals(ConfigurationBuilderImpl.IMPORT)) { 090 configurationBuilder.getClassHierarchy().getNode(value); 091 final String[] tok = value.split(ReflectionUtilities.REGEXP); 092 final String lastTok = tok[tok.length - 1]; 093 try { 094 configurationBuilder.getClassHierarchy().getNode(lastTok); 095 throw new IllegalArgumentException("Conflict on short name: " + lastTok); 096 } catch (final BindException e) { 097 final String oldValue = importedNames.put(lastTok, value); 098 if (oldValue != null) { 099 throw new IllegalArgumentException("Name conflict: " 100 + lastTok + " maps to " + oldValue + " and " + value, e); 101 } 102 } 103 } else if (value.startsWith(ConfigurationBuilderImpl.INIT)) { 104 final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length()) 105 .replaceAll("^[\\s\\(]+", "") 106 .replaceAll("[\\s\\)]+$", "") 107 .split("[\\s\\-]+"); 108 configurationBuilder.registerLegacyConstructor(key, classes); 109 } else { 110 configurationBuilder.bind(key, value); 111 } 112 } catch (final BindException | ClassHierarchyException e) { 113 throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e); 114 } 115 } 116 } 117 118 private static AvroConfiguration avroFromFile(final File file) throws IOException { 119 final AvroConfiguration avroConfiguration; 120 try (final DataFileReader<AvroConfiguration> dataFileReader = 121 new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) { 122 avroConfiguration = dataFileReader.next(); 123 } 124 return avroConfiguration; 125 } 126 127 private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException { 128 final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null); 129 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 130 return reader.read(null, decoder); 131 } 132 133 private static AvroConfiguration avroFromString(final String theString) throws IOException { 134 final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString); 135 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 136 return reader.read(null, decoder); 137 } 138 139 public AvroConfiguration toAvro(final Configuration configuration) { 140 // TODO[JIRA REEF-402]: This method should implement list serialization. Implement it when C# side is ready. 141 142 final List<ConfigurationEntry> configurationEntries = new ArrayList<>(); 143 144 for (final ClassNode<?> opt : configuration.getBoundImplementations()) { 145 configurationEntries.add(ConfigurationEntry.newBuilder() 146 .setKey(opt.getFullName()) 147 .setValue(configuration.getBoundImplementation(opt).getFullName()) 148 .build()); 149 } 150 151 for (final ClassNode<?> opt : configuration.getBoundConstructors()) { 152 configurationEntries.add(ConfigurationEntry.newBuilder() 153 .setKey(opt.getFullName()) 154 .setValue(configuration.getBoundConstructor(opt).getFullName()) 155 .build()); 156 } 157 for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) { 158 configurationEntries.add(ConfigurationEntry.newBuilder() 159 .setKey(opt.getFullName()) 160 .setValue(configuration.getNamedParameter(opt)) 161 .build()); 162 } 163 for (final ClassNode<?> cn : configuration.getLegacyConstructors()) { 164 final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-"); 165 configurationEntries.add(ConfigurationEntry.newBuilder() 166 .setKey(cn.getFullName()) 167 .setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")") 168 .build()); 169 } 170 for (final NamedParameterNode<Set<?>> key : configuration.getBoundSets()) { 171 for (final Object value : configuration.getBoundSet(key)) { 172 final String val; 173 if (value instanceof String) { 174 val = (String) value; 175 } else if (value instanceof Node) { 176 val = ((Node) value).getFullName(); 177 } else { 178 throw new IllegalStateException("The value bound to a given NamedParameterNode " 179 + key + " is neither the set of class hierarchy nodes nor strings."); 180 } 181 configurationEntries.add(ConfigurationEntry.newBuilder() 182 .setKey(key.getFullName()) 183 .setValue(val) 184 .build()); 185 } 186 } 187 // TODO[JIRA REEF-402]: Implement list serialization 188 if (configuration.getBoundLists() != null && !configuration.getBoundLists().isEmpty()) { 189 throw new NotImplementedException("List serialization/deserialization is not supported"); 190 } 191 192 return AvroConfiguration.newBuilder().setLanguage(JAVA).setBindings(configurationEntries).build(); 193 } 194 195 @Override 196 public void toFile(final Configuration conf, final File file) throws IOException { 197 final AvroConfiguration avroConfiguration = toAvro(conf); 198 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 199 try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) { 200 dataFileWriter.create(avroConfiguration.getSchema(), file); 201 dataFileWriter.append(avroConfiguration); 202 } 203 } 204 205 @Override 206 public void toTextFile(final Configuration conf, final File file) throws IOException { 207 try (final Writer w = new OutputStreamWriter(new FileOutputStream(file), JSON_CHARSET)) { 208 w.write(this.toString(conf)); 209 } 210 } 211 212 @Override 213 public byte[] toByteArray(final Configuration conf) throws IOException { 214 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 215 final byte[] theBytes; 216 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 217 final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 218 configurationWriter.write(toAvro(conf), encoder); 219 encoder.flush(); 220 out.flush(); 221 theBytes = out.toByteArray(); 222 } 223 return theBytes; 224 } 225 226 /** 227 * Produce a JSON string that represents given configuration. 228 * @param configuration Tang configuration to convert into a JSON string. 229 * @return A JSON string that corresponds to the given Tang configuration. 230 */ 231 @Override 232 public String toString(final Configuration configuration) { 233 return toString(configuration, false); 234 } 235 236 /** 237 * Produce a JSON string that represents given configuration. 238 * @param configuration Tang configuration to convert into a JSON string. 239 * @param prettyPrint If true, use new lines and spaces to pretty print the JSON string. 240 * If false (by default), output JSON as a single line. 241 * @return A JSON string that corresponds to the given Tang configuration. 242 */ 243 public String toString(final Configuration configuration, final boolean prettyPrint) { 244 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 245 final String result; 246 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 247 // TODO [REEF-1536] Re-enable pretty printing when Avro 1.7.5 available on all environments: 248 final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); //, prettyPrint); 249 configurationWriter.write(toAvro(configuration), encoder); 250 encoder.flush(); 251 out.flush(); 252 result = out.toString(JSON_CHARSET); 253 } catch (final IOException e) { 254 throw new RuntimeException(e); 255 } 256 return result; 257 } 258 259 /** 260 * Converts a given AvroConfiguration to Configuration. 261 * 262 * @param avroConfiguration a Avro configuration 263 * @return a Configuration version of the given AvroConfiguration 264 */ 265 public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException { 266 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 267 fromAvro(avroConfiguration, configurationBuilder); 268 return configurationBuilder.build(); 269 } 270 271 /** 272 * Converts a given AvroConfiguration to Configuration. 273 * 274 * @param avroConfiguration a Avro configuration 275 * @param classHierarchy the class hierarchy used for validation. 276 * @return a Configuration version of the given AvroConfiguration 277 */ 278 public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy) 279 throws BindException { 280 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy); 281 fromAvro(avroConfiguration, configurationBuilder); 282 return configurationBuilder.build(); 283 } 284 285 @Override 286 public Configuration fromFile(final File file) throws IOException, BindException { 287 return fromAvro(avroFromFile(file)); 288 } 289 290 @Override 291 public Configuration fromFile(final File file, final ClassHierarchy classHierarchy) 292 throws IOException, BindException { 293 return fromAvro(avroFromFile(file), classHierarchy); 294 } 295 296 @Override 297 public Configuration fromTextFile(final File file) throws IOException, BindException { 298 final StringBuilder result = readFromTextFile(file); 299 return this.fromString(result.toString()); 300 } 301 302 @Override 303 public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy) 304 throws IOException, BindException { 305 final StringBuilder result = readFromTextFile(file); 306 return this.fromString(result.toString(), classHierarchy); 307 } 308 309 private StringBuilder readFromTextFile(final File file) throws IOException { 310 final StringBuilder result = new StringBuilder(); 311 try (final BufferedReader reader = 312 new BufferedReader(new InputStreamReader(new FileInputStream(file), JSON_CHARSET))) { 313 String line = reader.readLine(); 314 while (line != null) { 315 result.append(line); 316 line = reader.readLine(); 317 } 318 } 319 return result; 320 } 321 322 @Override 323 public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException { 324 return fromAvro(avroFromBytes(theBytes)); 325 } 326 327 @Override 328 public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy) 329 throws IOException, BindException { 330 return fromAvro(avroFromBytes(theBytes), classHierarchy); 331 } 332 333 @Override 334 public Configuration fromString(final String theString) throws IOException, BindException { 335 return fromAvro(avroFromString(theString)); 336 } 337 338 @Override 339 public Configuration fromString(final String theString, final ClassHierarchy classHierarchy) 340 throws IOException, BindException { 341 return fromAvro(avroFromString(theString), classHierarchy); 342 } 343 344 /** 345 * Converts a given serialized string to ConfigurationBuilder from which Configuration can be produced. 346 * 347 * @param theString the String containing configuration 348 * @param configBuilder a configuration builder 349 * @throws IOException if the string is not Avro format 350 * @throws BindException if the content of configuration string is invalid to bind 351 */ 352 public void configurationBuilderFromString(final String theString, final ConfigurationBuilder configBuilder) 353 throws IOException, BindException { 354 fromAvro(avroFromString(theString), configBuilder); 355 } 356}