001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tang.formats; 020 021import org.apache.avro.file.DataFileReader; 022import org.apache.avro.file.DataFileWriter; 023import org.apache.avro.io.*; 024import org.apache.avro.specific.SpecificDatumReader; 025import org.apache.avro.specific.SpecificDatumWriter; 026import org.apache.commons.lang.NotImplementedException; 027import org.apache.commons.lang.StringUtils; 028import org.apache.reef.tang.ClassHierarchy; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.ConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.exceptions.BindException; 033import org.apache.reef.tang.exceptions.ClassHierarchyException; 034import org.apache.reef.tang.formats.avro.AvroConfiguration; 035import org.apache.reef.tang.formats.avro.ConfigurationEntry; 036import org.apache.reef.tang.implementation.ConfigurationBuilderImpl; 037import org.apache.reef.tang.types.ClassNode; 038import org.apache.reef.tang.types.NamedParameterNode; 039import org.apache.reef.tang.types.Node; 040import org.apache.reef.tang.util.ReflectionUtilities; 041 042import javax.inject.Inject; 043 044import java.io.*; 045import java.util.*; 046 047/** 048 * (De-)Serializing Configuration to and from AvroConfiguration. 049 * <p> 050 * This class is stateless and is therefore safe to reuse. 051 */ 052public final class AvroConfigurationSerializer implements ConfigurationSerializer { 053 054 /** 055 * The Charset used for the JSON encoding. 056 * <p> 057 * Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code> 058 */ 059 private static final String JSON_CHARSET = "ISO-8859-1"; 060 public static final String JAVA = "Java"; 061 public static final String CS = "Cs"; 062 063 @Inject 064 public AvroConfigurationSerializer() { 065 } 066 067 private static void fromAvro(final AvroConfiguration avroConfiguration, 068 final ConfigurationBuilder configurationBuilder) throws BindException { 069 // TODO[JIRA REEF-402]: This method should implement list deserialization. Implement it when C# side is ready. 070 final Map<String, String> importedNames = new HashMap<>(); 071 072 for (final ConfigurationEntry entry : avroConfiguration.getBindings()) { 073 074 final String longName = importedNames.get(entry.getKey().toString()); 075 final String key; 076 if (null == longName) { 077 key = entry.getKey().toString(); 078 } else { 079 key = longName; 080 } 081 082 // entry.getValue()'s type can be either string or array of string 083 final Object rawValue = entry.getValue(); 084 085 try { 086 // TODO[JIRA REEF-402]: Implement list deserialization 087 // rawValue is String. 088 final String value = rawValue.toString(); 089 if (key.equals(ConfigurationBuilderImpl.IMPORT)) { 090 configurationBuilder.getClassHierarchy().getNode(value); 091 final String[] tok = value.split(ReflectionUtilities.REGEXP); 092 final String lastTok = tok[tok.length - 1]; 093 try { 094 configurationBuilder.getClassHierarchy().getNode(lastTok); 095 throw new IllegalArgumentException("Conflict on short name: " + lastTok); 096 } catch (final BindException e) { 097 final String oldValue = importedNames.put(lastTok, value); 098 if (oldValue != null) { 099 throw new IllegalArgumentException("Name conflict: " 100 + lastTok + " maps to " + oldValue + " and " + value, e); 101 } 102 } 103 } else if (value.startsWith(ConfigurationBuilderImpl.INIT)) { 104 final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length()) 105 .replaceAll("^[\\s\\(]+", "") 106 .replaceAll("[\\s\\)]+$", "") 107 .split("[\\s\\-]+"); 108 configurationBuilder.registerLegacyConstructor(key, classes); 109 } else { 110 configurationBuilder.bind(key, value); 111 } 112 } catch (final BindException | ClassHierarchyException e) { 113 throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e); 114 } 115 } 116 } 117 118 private static AvroConfiguration avroFromFile(final File file) throws IOException { 119 final AvroConfiguration avroConfiguration; 120 try (final DataFileReader<AvroConfiguration> dataFileReader = 121 new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) { 122 avroConfiguration = dataFileReader.next(); 123 } 124 return avroConfiguration; 125 } 126 127 private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException { 128 final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null); 129 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 130 return reader.read(null, decoder); 131 } 132 133 private static AvroConfiguration avroFromString(final String theString) throws IOException { 134 final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString); 135 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 136 return reader.read(null, decoder); 137 } 138 139 public AvroConfiguration toAvro(final Configuration configuration) { 140 // TODO[JIRA REEF-402]: This method should implement list serialization. Implement it when C# side is ready. 141 142 final List<ConfigurationEntry> configurationEntries = new ArrayList<>(); 143 144 for (final ClassNode<?> opt : configuration.getBoundImplementations()) { 145 configurationEntries.add(ConfigurationEntry.newBuilder() 146 .setKey(opt.getFullName()) 147 .setValue(configuration.getBoundImplementation(opt).getFullName()) 148 .build()); 149 } 150 151 for (final ClassNode<?> opt : configuration.getBoundConstructors()) { 152 configurationEntries.add(ConfigurationEntry.newBuilder() 153 .setKey(opt.getFullName()) 154 .setValue(configuration.getBoundConstructor(opt).getFullName()) 155 .build()); 156 } 157 for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) { 158 configurationEntries.add(ConfigurationEntry.newBuilder() 159 .setKey(opt.getFullName()) 160 .setValue(configuration.getNamedParameter(opt)) 161 .build()); 162 } 163 for (final ClassNode<?> cn : configuration.getLegacyConstructors()) { 164 final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-"); 165 configurationEntries.add(ConfigurationEntry.newBuilder() 166 .setKey(cn.getFullName()) 167 .setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")") 168 .build()); 169 } 170 for (final NamedParameterNode<Set<?>> key : configuration.getBoundSets()) { 171 for (final Object value : configuration.getBoundSet(key)) { 172 final String val; 173 if (value instanceof String) { 174 val = (String) value; 175 } else if (value instanceof Node) { 176 val = ((Node) value).getFullName(); 177 } else { 178 throw new IllegalStateException("The value bound to a given NamedParameterNode " 179 + key + " is neither the set of class hierarchy nodes nor strings."); 180 } 181 configurationEntries.add(ConfigurationEntry.newBuilder() 182 .setKey(key.getFullName()) 183 .setValue(val) 184 .build()); 185 } 186 } 187 // TODO[JIRA REEF-402]: Implement list serialization 188 if (configuration.getBoundLists() != null && !configuration.getBoundLists().isEmpty()) { 189 throw new NotImplementedException("List serialization/deserialization is not supported"); 190 } 191 192 return AvroConfiguration.newBuilder().setLanguage(JAVA).setBindings(configurationEntries).build(); 193 } 194 195 @Override 196 public void toFile(final Configuration conf, final File file) throws IOException { 197 final AvroConfiguration avroConfiguration = toAvro(conf); 198 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 199 try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) { 200 dataFileWriter.create(avroConfiguration.getSchema(), file); 201 dataFileWriter.append(avroConfiguration); 202 } 203 } 204 205 @Override 206 public void toTextFile(final Configuration conf, final File file) throws IOException { 207 try (final Writer w = new OutputStreamWriter(new FileOutputStream(file), JSON_CHARSET)) { 208 w.write(this.toString(conf)); 209 } 210 } 211 212 @Override 213 public byte[] toByteArray(final Configuration conf) throws IOException { 214 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 215 final byte[] theBytes; 216 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 217 final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 218 configurationWriter.write(toAvro(conf), encoder); 219 encoder.flush(); 220 out.flush(); 221 theBytes = out.toByteArray(); 222 } 223 return theBytes; 224 } 225 226 @Override 227 public String toString(final Configuration configuration) { 228 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 229 final String result; 230 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 231 final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); 232 configurationWriter.write(toAvro(configuration), encoder); 233 encoder.flush(); 234 out.flush(); 235 result = out.toString(JSON_CHARSET); 236 } catch (final IOException e) { 237 throw new RuntimeException(e); 238 } 239 return result; 240 } 241 242 /** 243 * Converts a given AvroConfiguration to Configuration. 244 * 245 * @param avroConfiguration a Avro configuration 246 * @return a Configuration version of the given AvroConfiguration 247 */ 248 public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException { 249 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 250 fromAvro(avroConfiguration, configurationBuilder); 251 return configurationBuilder.build(); 252 } 253 254 /** 255 * Converts a given AvroConfiguration to Configuration. 256 * 257 * @param avroConfiguration a Avro configuration 258 * @param classHierarchy the class hierarchy used for validation. 259 * @return a Configuration version of the given AvroConfiguration 260 */ 261 public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy) 262 throws BindException { 263 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy); 264 fromAvro(avroConfiguration, configurationBuilder); 265 return configurationBuilder.build(); 266 } 267 268 @Override 269 public Configuration fromFile(final File file) throws IOException, BindException { 270 return fromAvro(avroFromFile(file)); 271 } 272 273 @Override 274 public Configuration fromFile(final File file, final ClassHierarchy classHierarchy) 275 throws IOException, BindException { 276 return fromAvro(avroFromFile(file), classHierarchy); 277 } 278 279 @Override 280 public Configuration fromTextFile(final File file) throws IOException, BindException { 281 final StringBuilder result = readFromTextFile(file); 282 return this.fromString(result.toString()); 283 } 284 285 @Override 286 public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy) 287 throws IOException, BindException { 288 final StringBuilder result = readFromTextFile(file); 289 return this.fromString(result.toString(), classHierarchy); 290 } 291 292 private StringBuilder readFromTextFile(final File file) throws IOException { 293 final StringBuilder result = new StringBuilder(); 294 try (final BufferedReader reader = 295 new BufferedReader(new InputStreamReader(new FileInputStream(file), JSON_CHARSET))) { 296 String line = reader.readLine(); 297 while (line != null) { 298 result.append(line); 299 line = reader.readLine(); 300 } 301 } 302 return result; 303 } 304 305 @Override 306 public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException { 307 return fromAvro(avroFromBytes(theBytes)); 308 } 309 310 @Override 311 public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy) 312 throws IOException, BindException { 313 return fromAvro(avroFromBytes(theBytes), classHierarchy); 314 } 315 316 @Override 317 public Configuration fromString(final String theString) throws IOException, BindException { 318 return fromAvro(avroFromString(theString)); 319 } 320 321 @Override 322 public Configuration fromString(final String theString, final ClassHierarchy classHierarchy) 323 throws IOException, BindException { 324 return fromAvro(avroFromString(theString), classHierarchy); 325 } 326 327 /** 328 * Converts a given serialized string to ConfigurationBuilder from which Configuration can be produced. 329 * 330 * @param theString the String containing configuration 331 * @param configBuilder a configuration builder 332 * @throws IOException if the string is not Avro format 333 * @throws BindException if the content of configuration string is invalid to bind 334 */ 335 public void configurationBuilderFromString(final String theString, final ConfigurationBuilder configBuilder) 336 throws IOException, BindException { 337 fromAvro(avroFromString(theString), configBuilder); 338 } 339}