This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.watcher.util;
020
021import org.apache.avro.Schema;
022import org.apache.avro.io.Encoder;
023import org.apache.avro.io.EncoderFactory;
024import org.apache.avro.specific.SpecificDatumWriter;
025import org.apache.avro.specific.SpecificRecord;
026import org.apache.reef.annotations.Unstable;
027import org.apache.reef.annotations.audience.Private;
028import org.apache.reef.common.Failure;
029import org.apache.reef.driver.catalog.NodeDescriptor;
030import org.apache.reef.driver.catalog.RackDescriptor;
031import org.apache.reef.driver.context.ActiveContext;
032import org.apache.reef.driver.context.ClosedContext;
033import org.apache.reef.driver.context.ContextBase;
034import org.apache.reef.driver.context.FailedContext;
035import org.apache.reef.driver.evaluator.*;
036import org.apache.reef.driver.task.*;
037import org.apache.reef.io.watcher.common.AvroFailure;
038import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptor;
039import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptorInRackDescriptor;
040import org.apache.reef.io.watcher.driver.catalog.AvroRackDescriptor;
041import org.apache.reef.io.watcher.driver.context.AvroActiveContext;
042import org.apache.reef.io.watcher.driver.context.AvroClosedContext;
043import org.apache.reef.io.watcher.driver.context.AvroContextBase;
044import org.apache.reef.io.watcher.driver.context.AvroFailedContext;
045import org.apache.reef.io.watcher.driver.evaluator.*;
046import org.apache.reef.io.watcher.driver.task.*;
047import org.apache.reef.io.watcher.wake.time.event.AvroStartTime;
048import org.apache.reef.io.watcher.wake.time.event.AvroStopTime;
049import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStart;
050import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStop;
051import org.apache.reef.util.Optional;
052import org.apache.reef.wake.time.event.StartTime;
053import org.apache.reef.wake.time.event.StopTime;
054import org.apache.reef.wake.time.runtime.event.RuntimeStart;
055import org.apache.reef.wake.time.runtime.event.RuntimeStop;
056
057import java.io.ByteArrayOutputStream;
058import java.io.IOException;
059import java.net.InetSocketAddress;
060import java.nio.ByteBuffer;
061import java.nio.charset.Charset;
062import java.util.ArrayList;
063import java.util.List;
064
065@Private
066@Unstable
067public final class WatcherAvroUtil {
068
069  public static AvroFailure toAvroFailure(final Failure failure) {
070    final String reason;
071    if (failure.getReason().isPresent()) {
072      reason = convertThrowableToString(failure.getReason().get());
073    } else {
074      reason = null;
075    }
076
077    return AvroFailure.newBuilder()
078        .setAsError(convertThrowableToString(failure.asError()))
079        .setData(unwrapOptionalByteArray(failure.getData()))
080        .setDescription(failure.getDescription().orElse(null))
081        .setId(failure.getId())
082        .setMessage(failure.getMessage())
083        .setReason(reason)
084        .build();
085  }
086
087  public static AvroNodeDescriptorInRackDescriptor toAvroNodeDescriptorInRackDescriptor(
088      final String id, final String name, final InetSocketAddress inetSocketAddress) {
089    return AvroNodeDescriptorInRackDescriptor.newBuilder()
090        .setInetSocketAddress(inetSocketAddress.toString())
091        .setId(id)
092        .setName(name)
093        .build();
094  }
095
096  public static AvroRackDescriptor toAvroRackDescriptor(final RackDescriptor rackDescriptor) {
097    final List<AvroNodeDescriptorInRackDescriptor> nodeDescriptorList = new ArrayList<>();
098    for (final NodeDescriptor nodeDescriptor : rackDescriptor.getNodes()) {
099      nodeDescriptorList.add(
100          toAvroNodeDescriptorInRackDescriptor(
101              nodeDescriptor.getId(), nodeDescriptor.getName(), nodeDescriptor.getInetSocketAddress()
102          )
103      );
104    }
105
106    return AvroRackDescriptor.newBuilder()
107        .setNodes(nodeDescriptorList)
108        .setName(rackDescriptor.getName())
109        .build();
110  }
111
112  public static AvroNodeDescriptor toAvroNodeDescriptor(final NodeDescriptor nodeDescriptor) {
113    return AvroNodeDescriptor.newBuilder()
114        .setId(nodeDescriptor.getId())
115        .setName(nodeDescriptor.getName())
116        .setInetSocketAddress(nodeDescriptor.getInetSocketAddress().toString())
117        .setRackDescriptor(toAvroRackDescriptor(nodeDescriptor.getRackDescriptor()))
118        .build();
119  }
120
121  public static AvroEvaluatorType toAvroEvaluatorType(final EvaluatorType evaluatorType) {
122    switch (evaluatorType) {
123    case JVM: return AvroEvaluatorType.JVM;
124    case CLR: return AvroEvaluatorType.CLR;
125    case UNDECIDED: return AvroEvaluatorType.UNDECIDED;
126    default: throw new RuntimeException(evaluatorType + " is not defined for AvroEvaluatorType.");
127    }
128  }
129
130  public static AvroEvaluatorProcess toAvroEvaluatorProcess(final EvaluatorProcess evaluatorProcess) {
131    final List<CharSequence> commandLines = new ArrayList<>();
132    for (final  String commandLine : evaluatorProcess.getCommandLine()) {
133      commandLines.add(commandLine);
134    }
135
136    return AvroEvaluatorProcess.newBuilder()
137        .setCommandLines(commandLines)
138        .setEvaluatorType(toAvroEvaluatorType(evaluatorProcess.getType()))
139        .setIsOptionSet(evaluatorProcess.isOptionSet())
140        .build();
141  }
142
143  public static AvroEvaluatorDescriptor toAvroEvaluatorDescriptor(final EvaluatorDescriptor evaluatorDescriptor) {
144    return AvroEvaluatorDescriptor.newBuilder()
145        .setMemory(evaluatorDescriptor.getMemory())
146        .setNodeDescriptor(toAvroNodeDescriptor(evaluatorDescriptor.getNodeDescriptor()))
147        .setNumberOfCores(evaluatorDescriptor.getNumberOfCores())
148        .setProcess(toAvroEvaluatorProcess(evaluatorDescriptor.getProcess()))
149        .build();
150  }
151
152  public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) {
153    return AvroRuntimeStart.newBuilder()
154        .setTimestamp(runtimeStart.getTimestamp())
155        .build();
156  }
157
158  public static AvroStartTime toAvroStartTime(final StartTime startTime) {
159    return AvroStartTime.newBuilder()
160        .setTimestamp(startTime.getTimestamp())
161        .build();
162  }
163
164  public static AvroStopTime toAvroStopTime(final StopTime stopTime) {
165    return AvroStopTime.newBuilder()
166        .setTimestamp(stopTime.getTimestamp())
167        .build();
168  }
169
170  public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) {
171    return AvroRuntimeStop.newBuilder()
172        .setException(convertThrowableToString(runtimeStop.getException()))
173        .setTimestamp(runtimeStop.getTimestamp())
174        .build();
175  }
176
177  public static AvroContextBase toAvroContextBase(final ContextBase contextBase) {
178    return AvroContextBase.newBuilder()
179        .setEvaluatorDescriptor(null)
180        .setEvaluatorId(contextBase.getEvaluatorId())
181        .setId(contextBase.getId())
182        .setParentId(contextBase.getParentId().orElse(null))
183        .build();
184  }
185
186  public static AvroActiveContext toAvroActiveContext(final ActiveContext activeContext) {
187    return AvroActiveContext.newBuilder()
188        .setBase(toAvroContextBase(activeContext))
189        .build();
190  }
191
192  public static AvroClosedContext toAvroClosedContext(final ClosedContext closedContext) {
193    return AvroClosedContext.newBuilder()
194        .setBase(toAvroContextBase(closedContext))
195        .setParentContext(toAvroActiveContext(closedContext.getParentContext()))
196        .build();
197  }
198
199  public static AvroFailedContext toAvroFailedContext(final FailedContext failedContext) {
200    return AvroFailedContext.newBuilder()
201        .setBase(toAvroContextBase(failedContext))
202        .setParentContext(unwrapOptionalActiveContext(failedContext.getParentContext()))
203        .setFailure(toAvroFailure(failedContext))
204        .build();
205  }
206
207  public static AvroCompletedTask toAvroCompletedTask(final CompletedTask completedTask) {
208    return AvroCompletedTask.newBuilder()
209        .setId(completedTask.getId())
210        .setActiveContext(toAvroActiveContext(completedTask.getActiveContext()))
211        .setGet(wrapNullableByteArray(completedTask.get()))
212        .build();
213  }
214
215  public static AvroFailedTask toAvroFailedTask(final FailedTask failedTask) {
216    return AvroFailedTask.newBuilder()
217        .setActiveContext(unwrapOptionalActiveContext(failedTask.getActiveContext()))
218        .setFailure(toAvroFailure(failedTask))
219        .build();
220  }
221
222  public static AvroRunningTask toAvroRunningTask(final RunningTask runningTask) {
223    return AvroRunningTask.newBuilder()
224        .setActiveContext(toAvroActiveContext(runningTask.getActiveContext()))
225        .setId(runningTask.getId())
226        .build();
227  }
228
229  public static AvroTaskMessage toAvroTaskMessage(final TaskMessage taskMessage) {
230    return AvroTaskMessage.newBuilder()
231        .setId(taskMessage.getId())
232        .setContextId(taskMessage.getContextId())
233        .setMessageSourceId(taskMessage.getMessageSourceID())
234        .setGet(wrapNullableByteArray(taskMessage.get()))
235        .build();
236  }
237
238  public static AvroSuspendedTask toAvroSuspendedTask(final SuspendedTask suspendedTask) {
239    return AvroSuspendedTask.newBuilder()
240        .setGet(wrapNullableByteArray(suspendedTask.get()))
241        .setId(suspendedTask.getId())
242        .setActiveContext(toAvroActiveContext(suspendedTask.getActiveContext()))
243        .build();
244  }
245
246  public static AvroAllocatedEvaluator toAvroAllocatedEvaluator(final AllocatedEvaluator allocatedEvaluator) {
247    return AvroAllocatedEvaluator.newBuilder()
248        .setId(allocatedEvaluator.getId())
249        .setEvaluatorDescriptor(toAvroEvaluatorDescriptor(allocatedEvaluator.getEvaluatorDescriptor()))
250        .build();
251  }
252
253  public static AvroFailedEvaluator toAvroFailedEvaluator(final FailedEvaluator failedEvaluator) {
254    final AvroFailedTask avroFailedTask;
255    if (failedEvaluator.getFailedTask().isPresent()) {
256      avroFailedTask = toAvroFailedTask(failedEvaluator.getFailedTask().get());
257    } else {
258      avroFailedTask = null;
259    }
260
261    final List<AvroFailedContext> avroFailedContextList = new ArrayList<>();
262    for (final FailedContext failedContext : failedEvaluator.getFailedContextList()) {
263      avroFailedContextList.add(toAvroFailedContext(failedContext));
264    }
265
266    return AvroFailedEvaluator.newBuilder()
267        .setId(failedEvaluator.getId())
268        .setEvaluatorException(convertThrowableToString(failedEvaluator.getEvaluatorException()))
269        .setFailedContextList(avroFailedContextList)
270        .setFailedTask(avroFailedTask)
271        .build();
272  }
273
274  public static AvroCompletedEvaluator toAvroCompletedEvaluator(final CompletedEvaluator completedEvaluator) {
275    return AvroCompletedEvaluator.newBuilder()
276        .setId(completedEvaluator.getId())
277        .build();
278  }
279
280  public static String toString(final SpecificRecord record) {
281    final String jsonEncodedRecord;
282    try {
283      final Schema schema = record.getSchema();
284      final ByteArrayOutputStream bos = new ByteArrayOutputStream();
285      final Encoder encoder = EncoderFactory.get().jsonEncoder(schema, bos);
286      final SpecificDatumWriter datumWriter = new SpecificDatumWriter(record.getClass());
287      datumWriter.write(record, encoder);
288      encoder.flush();
289      jsonEncodedRecord = new String(bos.toByteArray(), Charset.forName("UTF-8"));
290    } catch (final IOException e) {
291      throw new RuntimeException(e);
292    }
293    return jsonEncodedRecord;
294  }
295
296  private static AvroActiveContext unwrapOptionalActiveContext(final Optional<ActiveContext> optionalActiveContext) {
297    if (optionalActiveContext.isPresent()) {
298      return toAvroActiveContext(optionalActiveContext.get());
299    }
300
301    return null;
302  }
303
304  private static String convertThrowableToString(final Throwable throwable) {
305    if (throwable != null) {
306      return throwable.toString();
307    }
308
309    return null;
310  }
311
312  private static ByteBuffer wrapNullableByteArray(final byte[] data) {
313    if (data != null) {
314      return ByteBuffer.wrap(data);
315    }
316
317    return null;
318  }
319
320  private static ByteBuffer unwrapOptionalByteArray(final Optional<byte[]> optionalByteArray) {
321    if (optionalByteArray.isPresent()) {
322      return ByteBuffer.wrap(optionalByteArray.get());
323    }
324
325    return null;
326  }
327
328  /**
329   * Empty private constructor to prohibit instantiation of utility class.
330   */
331  private WatcherAvroUtil() {
332  }
333}