001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.watcher.util; 020 021import org.apache.avro.Schema; 022import org.apache.avro.io.Encoder; 023import org.apache.avro.io.EncoderFactory; 024import org.apache.avro.specific.SpecificDatumWriter; 025import org.apache.avro.specific.SpecificRecord; 026import org.apache.reef.annotations.Unstable; 027import org.apache.reef.annotations.audience.Private; 028import org.apache.reef.common.Failure; 029import org.apache.reef.driver.catalog.NodeDescriptor; 030import org.apache.reef.driver.catalog.RackDescriptor; 031import org.apache.reef.driver.context.ActiveContext; 032import org.apache.reef.driver.context.ClosedContext; 033import org.apache.reef.driver.context.ContextBase; 034import org.apache.reef.driver.context.FailedContext; 035import org.apache.reef.driver.evaluator.*; 036import org.apache.reef.driver.task.*; 037import org.apache.reef.io.watcher.common.AvroFailure; 038import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptor; 039import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptorInRackDescriptor; 040import org.apache.reef.io.watcher.driver.catalog.AvroRackDescriptor; 041import org.apache.reef.io.watcher.driver.context.AvroActiveContext; 042import org.apache.reef.io.watcher.driver.context.AvroClosedContext; 043import org.apache.reef.io.watcher.driver.context.AvroContextBase; 044import org.apache.reef.io.watcher.driver.context.AvroFailedContext; 045import org.apache.reef.io.watcher.driver.evaluator.*; 046import org.apache.reef.io.watcher.driver.task.*; 047import org.apache.reef.io.watcher.wake.time.event.AvroStartTime; 048import org.apache.reef.io.watcher.wake.time.event.AvroStopTime; 049import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStart; 050import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStop; 051import org.apache.reef.util.Optional; 052import org.apache.reef.wake.time.event.StartTime; 053import org.apache.reef.wake.time.event.StopTime; 054import org.apache.reef.wake.time.runtime.event.RuntimeStart; 055import org.apache.reef.wake.time.runtime.event.RuntimeStop; 056 057import java.io.ByteArrayOutputStream; 058import java.io.IOException; 059import java.net.InetSocketAddress; 060import java.nio.ByteBuffer; 061import java.nio.charset.Charset; 062import java.util.ArrayList; 063import java.util.List; 064 065@Private 066@Unstable 067public final class WatcherAvroUtil { 068 069 public static AvroFailure toAvroFailure(final Failure failure) { 070 final String reason; 071 if (failure.getReason().isPresent()) { 072 reason = convertThrowableToString(failure.getReason().get()); 073 } else { 074 reason = null; 075 } 076 077 return AvroFailure.newBuilder() 078 .setAsError(convertThrowableToString(failure.asError())) 079 .setData(unwrapOptionalByteArray(failure.getData())) 080 .setDescription(failure.getDescription().orElse(null)) 081 .setId(failure.getId()) 082 .setMessage(failure.getMessage()) 083 .setReason(reason) 084 .build(); 085 } 086 087 public static AvroNodeDescriptorInRackDescriptor toAvroNodeDescriptorInRackDescriptor( 088 final String id, final String name, final InetSocketAddress inetSocketAddress) { 089 return AvroNodeDescriptorInRackDescriptor.newBuilder() 090 .setInetSocketAddress(inetSocketAddress.toString()) 091 .setId(id) 092 .setName(name) 093 .build(); 094 } 095 096 public static AvroRackDescriptor toAvroRackDescriptor(final RackDescriptor rackDescriptor) { 097 final List<AvroNodeDescriptorInRackDescriptor> nodeDescriptorList = new ArrayList<>(); 098 for (final NodeDescriptor nodeDescriptor : rackDescriptor.getNodes()) { 099 nodeDescriptorList.add( 100 toAvroNodeDescriptorInRackDescriptor( 101 nodeDescriptor.getId(), nodeDescriptor.getName(), nodeDescriptor.getInetSocketAddress() 102 ) 103 ); 104 } 105 106 return AvroRackDescriptor.newBuilder() 107 .setNodes(nodeDescriptorList) 108 .setName(rackDescriptor.getName()) 109 .build(); 110 } 111 112 public static AvroNodeDescriptor toAvroNodeDescriptor(final NodeDescriptor nodeDescriptor) { 113 return AvroNodeDescriptor.newBuilder() 114 .setId(nodeDescriptor.getId()) 115 .setName(nodeDescriptor.getName()) 116 .setInetSocketAddress(nodeDescriptor.getInetSocketAddress().toString()) 117 .setRackDescriptor(toAvroRackDescriptor(nodeDescriptor.getRackDescriptor())) 118 .build(); 119 } 120 121 public static AvroEvaluatorType toAvroEvaluatorType(final EvaluatorType evaluatorType) { 122 switch (evaluatorType) { 123 case JVM: return AvroEvaluatorType.JVM; 124 case CLR: return AvroEvaluatorType.CLR; 125 case UNDECIDED: return AvroEvaluatorType.UNDECIDED; 126 default: throw new RuntimeException(evaluatorType + " is not defined for AvroEvaluatorType."); 127 } 128 } 129 130 public static AvroEvaluatorProcess toAvroEvaluatorProcess(final EvaluatorProcess evaluatorProcess) { 131 final List<CharSequence> commandLines = new ArrayList<>(); 132 for (final String commandLine : evaluatorProcess.getCommandLine()) { 133 commandLines.add(commandLine); 134 } 135 136 return AvroEvaluatorProcess.newBuilder() 137 .setCommandLines(commandLines) 138 .setEvaluatorType(toAvroEvaluatorType(evaluatorProcess.getType())) 139 .setIsOptionSet(evaluatorProcess.isOptionSet()) 140 .build(); 141 } 142 143 public static AvroEvaluatorDescriptor toAvroEvaluatorDescriptor(final EvaluatorDescriptor evaluatorDescriptor) { 144 return AvroEvaluatorDescriptor.newBuilder() 145 .setMemory(evaluatorDescriptor.getMemory()) 146 .setNodeDescriptor(toAvroNodeDescriptor(evaluatorDescriptor.getNodeDescriptor())) 147 .setNumberOfCores(evaluatorDescriptor.getNumberOfCores()) 148 .setProcess(toAvroEvaluatorProcess(evaluatorDescriptor.getProcess())) 149 .build(); 150 } 151 152 public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) { 153 return AvroRuntimeStart.newBuilder() 154 .setTimestamp(runtimeStart.getTimestamp()) 155 .build(); 156 } 157 158 public static AvroStartTime toAvroStartTime(final StartTime startTime) { 159 return AvroStartTime.newBuilder() 160 .setTimestamp(startTime.getTimestamp()) 161 .build(); 162 } 163 164 public static AvroStopTime toAvroStopTime(final StopTime stopTime) { 165 return AvroStopTime.newBuilder() 166 .setTimestamp(stopTime.getTimestamp()) 167 .build(); 168 } 169 170 public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) { 171 return AvroRuntimeStop.newBuilder() 172 .setException(convertThrowableToString(runtimeStop.getException())) 173 .setTimestamp(runtimeStop.getTimestamp()) 174 .build(); 175 } 176 177 public static AvroContextBase toAvroContextBase(final ContextBase contextBase) { 178 return AvroContextBase.newBuilder() 179 .setEvaluatorDescriptor(null) 180 .setEvaluatorId(contextBase.getEvaluatorId()) 181 .setId(contextBase.getId()) 182 .setParentId(contextBase.getParentId().orElse(null)) 183 .build(); 184 } 185 186 public static AvroActiveContext toAvroActiveContext(final ActiveContext activeContext) { 187 return AvroActiveContext.newBuilder() 188 .setBase(toAvroContextBase(activeContext)) 189 .build(); 190 } 191 192 public static AvroClosedContext toAvroClosedContext(final ClosedContext closedContext) { 193 return AvroClosedContext.newBuilder() 194 .setBase(toAvroContextBase(closedContext)) 195 .setParentContext(toAvroActiveContext(closedContext.getParentContext())) 196 .build(); 197 } 198 199 public static AvroFailedContext toAvroFailedContext(final FailedContext failedContext) { 200 return AvroFailedContext.newBuilder() 201 .setBase(toAvroContextBase(failedContext)) 202 .setParentContext(unwrapOptionalActiveContext(failedContext.getParentContext())) 203 .setFailure(toAvroFailure(failedContext)) 204 .build(); 205 } 206 207 public static AvroCompletedTask toAvroCompletedTask(final CompletedTask completedTask) { 208 return AvroCompletedTask.newBuilder() 209 .setId(completedTask.getId()) 210 .setActiveContext(toAvroActiveContext(completedTask.getActiveContext())) 211 .setGet(wrapNullableByteArray(completedTask.get())) 212 .build(); 213 } 214 215 public static AvroFailedTask toAvroFailedTask(final FailedTask failedTask) { 216 return AvroFailedTask.newBuilder() 217 .setActiveContext(unwrapOptionalActiveContext(failedTask.getActiveContext())) 218 .setFailure(toAvroFailure(failedTask)) 219 .build(); 220 } 221 222 public static AvroRunningTask toAvroRunningTask(final RunningTask runningTask) { 223 return AvroRunningTask.newBuilder() 224 .setActiveContext(toAvroActiveContext(runningTask.getActiveContext())) 225 .setId(runningTask.getId()) 226 .build(); 227 } 228 229 public static AvroTaskMessage toAvroTaskMessage(final TaskMessage taskMessage) { 230 return AvroTaskMessage.newBuilder() 231 .setId(taskMessage.getId()) 232 .setContextId(taskMessage.getContextId()) 233 .setMessageSourceId(taskMessage.getMessageSourceID()) 234 .setGet(wrapNullableByteArray(taskMessage.get())) 235 .build(); 236 } 237 238 public static AvroSuspendedTask toAvroSuspendedTask(final SuspendedTask suspendedTask) { 239 return AvroSuspendedTask.newBuilder() 240 .setGet(wrapNullableByteArray(suspendedTask.get())) 241 .setId(suspendedTask.getId()) 242 .setActiveContext(toAvroActiveContext(suspendedTask.getActiveContext())) 243 .build(); 244 } 245 246 public static AvroAllocatedEvaluator toAvroAllocatedEvaluator(final AllocatedEvaluator allocatedEvaluator) { 247 return AvroAllocatedEvaluator.newBuilder() 248 .setId(allocatedEvaluator.getId()) 249 .setEvaluatorDescriptor(toAvroEvaluatorDescriptor(allocatedEvaluator.getEvaluatorDescriptor())) 250 .build(); 251 } 252 253 public static AvroFailedEvaluator toAvroFailedEvaluator(final FailedEvaluator failedEvaluator) { 254 final AvroFailedTask avroFailedTask; 255 if (failedEvaluator.getFailedTask().isPresent()) { 256 avroFailedTask = toAvroFailedTask(failedEvaluator.getFailedTask().get()); 257 } else { 258 avroFailedTask = null; 259 } 260 261 final List<AvroFailedContext> avroFailedContextList = new ArrayList<>(); 262 for (final FailedContext failedContext : failedEvaluator.getFailedContextList()) { 263 avroFailedContextList.add(toAvroFailedContext(failedContext)); 264 } 265 266 return AvroFailedEvaluator.newBuilder() 267 .setId(failedEvaluator.getId()) 268 .setEvaluatorException(convertThrowableToString(failedEvaluator.getEvaluatorException())) 269 .setFailedContextList(avroFailedContextList) 270 .setFailedTask(avroFailedTask) 271 .build(); 272 } 273 274 public static AvroCompletedEvaluator toAvroCompletedEvaluator(final CompletedEvaluator completedEvaluator) { 275 return AvroCompletedEvaluator.newBuilder() 276 .setId(completedEvaluator.getId()) 277 .build(); 278 } 279 280 public static String toString(final SpecificRecord record) { 281 final String jsonEncodedRecord; 282 try { 283 final Schema schema = record.getSchema(); 284 final ByteArrayOutputStream bos = new ByteArrayOutputStream(); 285 final Encoder encoder = EncoderFactory.get().jsonEncoder(schema, bos); 286 final SpecificDatumWriter datumWriter = new SpecificDatumWriter(record.getClass()); 287 datumWriter.write(record, encoder); 288 encoder.flush(); 289 jsonEncodedRecord = new String(bos.toByteArray(), Charset.forName("UTF-8")); 290 } catch (final IOException e) { 291 throw new RuntimeException(e); 292 } 293 return jsonEncodedRecord; 294 } 295 296 private static AvroActiveContext unwrapOptionalActiveContext(final Optional<ActiveContext> optionalActiveContext) { 297 if (optionalActiveContext.isPresent()) { 298 return toAvroActiveContext(optionalActiveContext.get()); 299 } 300 301 return null; 302 } 303 304 private static String convertThrowableToString(final Throwable throwable) { 305 if (throwable != null) { 306 return throwable.toString(); 307 } 308 309 return null; 310 } 311 312 private static ByteBuffer wrapNullableByteArray(final byte[] data) { 313 if (data != null) { 314 return ByteBuffer.wrap(data); 315 } 316 317 return null; 318 } 319 320 private static ByteBuffer unwrapOptionalByteArray(final Optional<byte[]> optionalByteArray) { 321 if (optionalByteArray.isPresent()) { 322 return ByteBuffer.wrap(optionalByteArray.get()); 323 } 324 325 return null; 326 } 327 328 /** 329 * Empty private constructor to prohibit instantiation of utility class. 330 */ 331 private WatcherAvroUtil() { 332 } 333}