001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tang.formats; 020 021import org.apache.avro.file.DataFileReader; 022import org.apache.avro.file.DataFileWriter; 023import org.apache.avro.io.*; 024import org.apache.avro.specific.SpecificDatumReader; 025import org.apache.avro.specific.SpecificDatumWriter; 026import org.apache.commons.lang.NotImplementedException; 027import org.apache.commons.lang.StringUtils; 028import org.apache.reef.tang.ClassHierarchy; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.ConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.exceptions.BindException; 033import org.apache.reef.tang.exceptions.ClassHierarchyException; 034import org.apache.reef.tang.formats.avro.AvroConfiguration; 035import org.apache.reef.tang.formats.avro.ConfigurationEntry; 036import org.apache.reef.tang.implementation.ConfigurationBuilderImpl; 037import org.apache.reef.tang.types.ClassNode; 038import org.apache.reef.tang.types.NamedParameterNode; 039import org.apache.reef.tang.types.Node; 040import org.apache.reef.tang.util.ReflectionUtilities; 041 042import javax.inject.Inject; 043 044import java.io.*; 045import java.util.*; 046 047/** 048 * (De-)Serializing Configuration to and from AvroConfiguration. 049 * <p> 050 * This class is stateless and is therefore safe to reuse. 051 */ 052public final class AvroConfigurationSerializer implements ConfigurationSerializer { 053 054 /** 055 * The Charset used for the JSON encoding. 056 * <p> 057 * Copied from <code>org.apache.avro.io.JsonDecoder.CHARSET</code> 058 */ 059 private static final String JSON_CHARSET = "ISO-8859-1"; 060 public static final String JAVA = "Java"; 061 public static final String CS = "Cs"; 062 063 @Inject 064 public AvroConfigurationSerializer() { 065 } 066 067 private static void fromAvro(final AvroConfiguration avroConfiguration, 068 final ConfigurationBuilder configurationBuilder) throws BindException { 069 // TODO[JIRA REEF-402]: This method should implement list deserialization. Implement it when C# side is ready. 070 final Map<String, String> importedNames = new HashMap<>(); 071 072 for (final ConfigurationEntry entry : avroConfiguration.getBindings()) { 073 074 final String longName = importedNames.get(entry.getKey().toString()); 075 final String key; 076 if (null == longName) { 077 key = entry.getKey().toString(); 078 } else { 079 key = longName; 080 } 081 082 // entry.getValue()'s type can be either string or array of string 083 final Object rawValue = entry.getValue(); 084 085 try { 086 // TODO[JIRA REEF-402]: Implement list deserialization 087 // rawValue is String. 088 final String value = rawValue.toString(); 089 if (key.equals(ConfigurationBuilderImpl.IMPORT)) { 090 configurationBuilder.getClassHierarchy().getNode(value); 091 final String[] tok = value.split(ReflectionUtilities.REGEXP); 092 final String lastTok = tok[tok.length - 1]; 093 try { 094 configurationBuilder.getClassHierarchy().getNode(lastTok); 095 throw new IllegalArgumentException("Conflict on short name: " + lastTok); 096 } catch (final BindException e) { 097 final String oldValue = importedNames.put(lastTok, value); 098 if (oldValue != null) { 099 throw new IllegalArgumentException("Name conflict: " 100 + lastTok + " maps to " + oldValue + " and " + value, e); 101 } 102 } 103 } else if (value.startsWith(ConfigurationBuilderImpl.INIT)) { 104 final String[] classes = value.substring(ConfigurationBuilderImpl.INIT.length(), value.length()) 105 .replaceAll("^[\\s\\(]+", "") 106 .replaceAll("[\\s\\)]+$", "") 107 .split("[\\s\\-]+"); 108 configurationBuilder.registerLegacyConstructor(key, classes); 109 } else { 110 configurationBuilder.bind(key, value); 111 } 112 } catch (final BindException | ClassHierarchyException e) { 113 throw new BindException("Failed to process configuration tuple: [" + key + "=" + rawValue + "]", e); 114 } 115 } 116 } 117 118 private static AvroConfiguration avroFromFile(final File file) throws IOException { 119 final AvroConfiguration avroConfiguration; 120 try (final DataFileReader<AvroConfiguration> dataFileReader = 121 new DataFileReader<>(file, new SpecificDatumReader<>(AvroConfiguration.class))) { 122 avroConfiguration = dataFileReader.next(); 123 } 124 return avroConfiguration; 125 } 126 127 private static AvroConfiguration avroFromBytes(final byte[] theBytes) throws IOException { 128 final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(theBytes, null); 129 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 130 return reader.read(null, decoder); 131 } 132 133 private static AvroConfiguration avroFromString(final String theString) throws IOException { 134 final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(AvroConfiguration.getClassSchema(), theString); 135 final SpecificDatumReader<AvroConfiguration> reader = new SpecificDatumReader<>(AvroConfiguration.class); 136 return reader.read(null, decoder); 137 } 138 139 public AvroConfiguration toAvro(final Configuration configuration) { 140 // TODO[JIRA REEF-402]: This method should implement list serialization. Implement it when C# side is ready. 141 142 final List<ConfigurationEntry> configurationEntries = new ArrayList<>(); 143 144 for (final ClassNode<?> opt : configuration.getBoundImplementations()) { 145 configurationEntries.add(ConfigurationEntry.newBuilder() 146 .setKey(opt.getFullName()) 147 .setValue(configuration.getBoundImplementation(opt).getFullName()) 148 .build()); 149 } 150 151 for (final ClassNode<?> opt : configuration.getBoundConstructors()) { 152 configurationEntries.add(ConfigurationEntry.newBuilder() 153 .setKey(opt.getFullName()) 154 .setValue(configuration.getBoundConstructor(opt).getFullName()) 155 .build()); 156 } 157 for (final NamedParameterNode<?> opt : configuration.getNamedParameters()) { 158 configurationEntries.add(ConfigurationEntry.newBuilder() 159 .setKey(opt.getFullName()) 160 .setValue(configuration.getNamedParameter(opt)) 161 .build()); 162 } 163 for (final ClassNode<?> cn : configuration.getLegacyConstructors()) { 164 final String legacyConstructors = StringUtils.join(configuration.getLegacyConstructor(cn).getArgs(), "-"); 165 configurationEntries.add(ConfigurationEntry.newBuilder() 166 .setKey(cn.getFullName()) 167 .setValue("" + ConfigurationBuilderImpl.INIT + "(" + legacyConstructors + ")") 168 .build()); 169 } 170 for (final NamedParameterNode<Set<?>> key : configuration.getBoundSets()) { 171 for (final Object value : configuration.getBoundSet(key)) { 172 final String val; 173 if (value instanceof String) { 174 val = (String) value; 175 } else if (value instanceof Node) { 176 val = ((Node) value).getFullName(); 177 } else { 178 throw new IllegalStateException(); 179 } 180 configurationEntries.add(ConfigurationEntry.newBuilder() 181 .setKey(key.getFullName()) 182 .setValue(val) 183 .build()); 184 } 185 } 186 // TODO[JIRA REEF-402]: Implement list serialization 187 if (configuration.getBoundLists() != null && !configuration.getBoundLists().isEmpty()) { 188 throw new NotImplementedException("List serialization/deserialization is not supported"); 189 } 190 191 return AvroConfiguration.newBuilder().setLanguage(JAVA).setBindings(configurationEntries).build(); 192 } 193 194 @Override 195 public void toFile(final Configuration conf, final File file) throws IOException { 196 final AvroConfiguration avroConfiguration = toAvro(conf); 197 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 198 try (DataFileWriter<AvroConfiguration> dataFileWriter = new DataFileWriter<>(configurationWriter)) { 199 dataFileWriter.create(avroConfiguration.getSchema(), file); 200 dataFileWriter.append(avroConfiguration); 201 } 202 } 203 204 @Override 205 public void toTextFile(final Configuration conf, final File file) throws IOException { 206 try (final Writer w = new OutputStreamWriter(new FileOutputStream(file), JSON_CHARSET)) { 207 w.write(this.toString(conf)); 208 } 209 } 210 211 @Override 212 public byte[] toByteArray(final Configuration conf) throws IOException { 213 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 214 final byte[] theBytes; 215 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 216 final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 217 configurationWriter.write(toAvro(conf), encoder); 218 encoder.flush(); 219 out.flush(); 220 theBytes = out.toByteArray(); 221 } 222 return theBytes; 223 } 224 225 @Override 226 public String toString(final Configuration configuration) { 227 final DatumWriter<AvroConfiguration> configurationWriter = new SpecificDatumWriter<>(AvroConfiguration.class); 228 final String result; 229 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 230 final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(AvroConfiguration.SCHEMA$, out); 231 configurationWriter.write(toAvro(configuration), encoder); 232 encoder.flush(); 233 out.flush(); 234 result = out.toString(JSON_CHARSET); 235 } catch (final IOException e) { 236 throw new RuntimeException(e); 237 } 238 return result; 239 } 240 241 /** 242 * Converts a given AvroConfiguration to Configuration. 243 * 244 * @param avroConfiguration a Avro configuration 245 * @return a Configuration version of the given AvroConfiguration 246 */ 247 public Configuration fromAvro(final AvroConfiguration avroConfiguration) throws BindException { 248 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); 249 fromAvro(avroConfiguration, configurationBuilder); 250 return configurationBuilder.build(); 251 } 252 253 /** 254 * Converts a given AvroConfiguration to Configuration. 255 * 256 * @param avroConfiguration a Avro configuration 257 * @param classHierarchy the class hierarchy used for validation. 258 * @return a Configuration version of the given AvroConfiguration 259 */ 260 public Configuration fromAvro(final AvroConfiguration avroConfiguration, final ClassHierarchy classHierarchy) 261 throws BindException { 262 final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(classHierarchy); 263 fromAvro(avroConfiguration, configurationBuilder); 264 return configurationBuilder.build(); 265 } 266 267 @Override 268 public Configuration fromFile(final File file) throws IOException, BindException { 269 return fromAvro(avroFromFile(file)); 270 } 271 272 @Override 273 public Configuration fromFile(final File file, final ClassHierarchy classHierarchy) 274 throws IOException, BindException { 275 return fromAvro(avroFromFile(file), classHierarchy); 276 } 277 278 @Override 279 public Configuration fromTextFile(final File file) throws IOException, BindException { 280 final StringBuilder result = readFromTextFile(file); 281 return this.fromString(result.toString()); 282 } 283 284 @Override 285 public Configuration fromTextFile(final File file, final ClassHierarchy classHierarchy) 286 throws IOException, BindException { 287 final StringBuilder result = readFromTextFile(file); 288 return this.fromString(result.toString(), classHierarchy); 289 } 290 291 private StringBuilder readFromTextFile(final File file) throws IOException { 292 final StringBuilder result = new StringBuilder(); 293 try (final BufferedReader reader = 294 new BufferedReader(new InputStreamReader(new FileInputStream(file), JSON_CHARSET))) { 295 String line = reader.readLine(); 296 while (line != null) { 297 result.append(line); 298 line = reader.readLine(); 299 } 300 } 301 return result; 302 } 303 304 @Override 305 public Configuration fromByteArray(final byte[] theBytes) throws IOException, BindException { 306 return fromAvro(avroFromBytes(theBytes)); 307 } 308 309 @Override 310 public Configuration fromByteArray(final byte[] theBytes, final ClassHierarchy classHierarchy) 311 throws IOException, BindException { 312 return fromAvro(avroFromBytes(theBytes), classHierarchy); 313 } 314 315 @Override 316 public Configuration fromString(final String theString) throws IOException, BindException { 317 return fromAvro(avroFromString(theString)); 318 } 319 320 @Override 321 public Configuration fromString(final String theString, final ClassHierarchy classHierarchy) 322 throws IOException, BindException { 323 return fromAvro(avroFromString(theString), classHierarchy); 324 } 325 326 /** 327 * Converts a given serialized string to ConfigurationBuilder from which Configuration can be produced. 328 * 329 * @param theString the String containing configuration 330 * @param configBuilder a configuration builder 331 * @throws IOException if the string is not Avro format 332 * @throws BindException if the content of configuration string is invalid to bind 333 */ 334 public void configurationBuilderFromString(final String theString, final ConfigurationBuilder configBuilder) 335 throws IOException, BindException { 336 fromAvro(avroFromString(theString), configBuilder); 337 } 338}