001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.fail.driver; 020 021import org.apache.reef.driver.context.*; 022import org.apache.reef.driver.evaluator.*; 023import org.apache.reef.driver.task.*; 024import org.apache.reef.tang.annotations.Name; 025import org.apache.reef.tang.annotations.NamedParameter; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.tang.annotations.Unit; 028import org.apache.reef.tang.exceptions.BindException; 029import org.apache.reef.tests.library.exceptions.DriverSideFailure; 030import org.apache.reef.tests.library.exceptions.SimulatedDriverFailure; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 033import org.apache.reef.wake.time.Clock; 034import org.apache.reef.wake.time.event.Alarm; 035import org.apache.reef.wake.time.event.StartTime; 036import org.apache.reef.wake.time.event.StopTime; 037 038import javax.inject.Inject; 039import javax.xml.bind.DatatypeConverter; 040import java.util.Arrays; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044import static org.apache.reef.tests.fail.driver.FailDriver.ExpectedMessage.RequiredFlag.OPTIONAL; 045import static org.apache.reef.tests.fail.driver.FailDriver.ExpectedMessage.RequiredFlag.REQUIRED; 046 047@Unit 048public final class FailDriver { 049 050 private static final Logger LOG = Logger.getLogger(FailDriver.class.getName()); 051 private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 052 private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO"); 053 /** 054 * Send message to the Task MSG_DELAY milliseconds after start. 055 */ 056 private static final int MSG_DELAY = 1000; 057 private static final ExpectedMessage[] EVENT_SEQUENCE = { 058 new ExpectedMessage(FailDriver.class, REQUIRED), 059 new ExpectedMessage(StartTime.class, REQUIRED), 060 new ExpectedMessage(AllocatedEvaluator.class, REQUIRED), 061 new ExpectedMessage(FailedEvaluator.class, OPTIONAL), 062 new ExpectedMessage(ActiveContext.class, REQUIRED), 063 new ExpectedMessage(ContextMessage.class, OPTIONAL), 064 new ExpectedMessage(FailedContext.class, OPTIONAL), 065 new ExpectedMessage(RunningTask.class, REQUIRED), 066 new ExpectedMessage(Alarm.class, REQUIRED), 067 new ExpectedMessage(TaskMessage.class, REQUIRED), 068 new ExpectedMessage(Alarm.class, REQUIRED), 069 new ExpectedMessage(SuspendedTask.class, REQUIRED), 070 new ExpectedMessage(RunningTask.class, REQUIRED), 071 new ExpectedMessage(Alarm.class, REQUIRED), 072 new ExpectedMessage(FailedTask.class, OPTIONAL), 073 new ExpectedMessage(CompletedTask.class, REQUIRED), 074 new ExpectedMessage(ClosedContext.class, OPTIONAL), 075 new ExpectedMessage(CompletedEvaluator.class, REQUIRED), 076 new ExpectedMessage(StopTime.class, REQUIRED) 077 }; 078 private final transient Class<?> failMsgClass; 079 private final transient EvaluatorRequestor requestor; 080 private final transient Clock clock; 081 private transient RunningTask task = null; 082 private transient int expectIdx = 0; 083 private transient DriverState state = DriverState.INIT; 084 085 @Inject 086 public FailDriver(final @Parameter(FailMsgClassName.class) String failMsgClassName, 087 final EvaluatorRequestor requestor, final Clock clock) 088 throws ClassNotFoundException { 089 this.failMsgClass = ClassLoader.getSystemClassLoader().loadClass(failMsgClassName); 090 this.requestor = requestor; 091 this.clock = clock; 092 this.checkMsgOrder(this); 093 } 094 095 /** 096 * Check if observer methods are called in the right order 097 * and generate an exception at the given point in the message sequence. 098 * 099 * @param msg a message from one of the observers. 100 * @throws SimulatedDriverFailure if failMsgClass matches the message class. 101 * @throws DriverSideFailure if messages are out of order. 102 */ 103 private void checkMsgOrder(final Object msg) throws SimulatedDriverFailure, DriverSideFailure { 104 105 final String msgClassName = msg.getClass().getName(); 106 LOG.log(Level.FINE, "At {0} {1}:{2}", new Object[]{ 107 this.state, this.expectIdx, msgClassName}); 108 109 if (this.state == DriverState.FAILED) { 110 // If already failed, do not do anything 111 return; 112 } 113 114 // Simulate failure at this step? 115 if (this.failMsgClass.isInstance(msg)) { 116 this.state = DriverState.FAILED; 117 } 118 119 // Make sure events arrive in the right order (specified in EVENT_SEQUENCE): 120 boolean notFound = true; 121 for (; this.expectIdx < EVENT_SEQUENCE.length; ++this.expectIdx) { 122 if (EVENT_SEQUENCE[expectIdx].msgClass.isInstance(msg)) { 123 notFound = false; 124 break; 125 } else if (EVENT_SEQUENCE[expectIdx].requiredFlag == REQUIRED) { 126 break; 127 } 128 } 129 130 if (notFound) { 131 LOG.log(Level.SEVERE, "Event out of sequence: {0} {1}:{2}", 132 new Object[]{this.state, this.expectIdx, msgClassName}); 133 throw new DriverSideFailure("Event out of sequence: " + msgClassName); 134 } 135 136 LOG.log(Level.INFO, "{0}: send: {1} got: {2}", new Object[]{ 137 this.state, EVENT_SEQUENCE[this.expectIdx], msgClassName}); 138 139 ++this.expectIdx; 140 141 if (this.state == DriverState.FAILED) { 142 final SimulatedDriverFailure ex = new SimulatedDriverFailure( 143 "Simulated Failure at FailDriver :: " + msgClassName); 144 LOG.log(Level.INFO, "Simulated Failure: {0}", ex); 145 throw ex; 146 } 147 } 148 149 private enum DriverState {INIT, SEND_MSG, SUSPEND, RESUME, CLOSE, FAILED} 150 151 /** 152 * Name of the message class to specify the failing message handler. 153 */ 154 @NamedParameter(doc = "Full name of the message class to fail on", short_name = "fail") 155 public static final class FailMsgClassName implements Name<String> { 156 } 157 158 public static final class ExpectedMessage { 159 160 public final transient Class<?> msgClass; 161 public final transient RequiredFlag requiredFlag; 162 private final transient String repr; 163 164 public ExpectedMessage(final Class<?> clazz, final RequiredFlag requiredFlag) { 165 this.msgClass = clazz; 166 this.requiredFlag = requiredFlag; 167 this.repr = this.msgClass.getSimpleName() + ":" + this.requiredFlag; 168 } 169 170 @Override 171 public String toString() { 172 return this.repr; 173 } 174 175 public enum RequiredFlag {OPTIONAL, REQUIRED} 176 } 177 178 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 179 @Override 180 public void onNext(final AllocatedEvaluator eval) { 181 checkMsgOrder(eval); 182 try { 183 eval.submitContext(ContextConfiguration.CONF 184 .set(ContextConfiguration.IDENTIFIER, "FailContext_" + eval.getId()) 185 .build()); 186 } catch (final BindException ex) { 187 LOG.log(Level.WARNING, "Context configuration error", ex); 188 throw new RuntimeException(ex); 189 } 190 } 191 } 192 193 final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { 194 @Override 195 public void onNext(final CompletedEvaluator eval) { 196 checkMsgOrder(eval); 197 // noop 198 } 199 } 200 201 final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 202 @Override 203 public void onNext(final FailedEvaluator eval) { 204 LOG.log(Level.WARNING, "Evaluator failed: " + eval.getId(), eval.getEvaluatorException()); 205 checkMsgOrder(eval); 206 throw new RuntimeException(eval.getEvaluatorException()); 207 } 208 } 209 210 final class ActiveContextHandler implements EventHandler<ActiveContext> { 211 @Override 212 public void onNext(final ActiveContext context) { 213 checkMsgOrder(context); 214 try { 215 context.submitTask(TaskConfiguration.CONF 216 .set(TaskConfiguration.IDENTIFIER, "FailTask_" + context.getId()) 217 .set(TaskConfiguration.TASK, NoopTask.class) 218 .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class) 219 .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class) 220 .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class) 221 .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class) 222 .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class) 223 .build()); 224 } catch (final BindException ex) { 225 LOG.log(Level.WARNING, "Task configuration error", ex); 226 throw new RuntimeException(ex); 227 } 228 } 229 } 230 231 final class ContextMessageHandler implements EventHandler<ContextMessage> { 232 @Override 233 public void onNext(final ContextMessage message) { 234 checkMsgOrder(message); 235 // noop 236 } 237 } 238 239 final class ClosedContextHandler implements EventHandler<ClosedContext> { 240 @Override 241 public void onNext(final ClosedContext context) { 242 checkMsgOrder(context); 243 // noop 244 } 245 } 246 247 final class FailedContextHandler implements EventHandler<FailedContext> { 248 @Override 249 public void onNext(final FailedContext context) { 250 LOG.log(Level.WARNING, "Context failed: " + context.getId(), context.getReason().orElse(null)); 251 checkMsgOrder(context); 252 // TODO: notify client? 253 254 // if (context.getParentContext().isPresent()) { 255 // context.getParentContext().get().close(); 256 // } 257 } 258 } 259 260 final class RunningTaskHandler implements EventHandler<RunningTask> { 261 @Override 262 public void onNext(final RunningTask task) { 263 checkMsgOrder(task); 264 FailDriver.this.task = task; 265 switch (state) { 266 case INIT: 267 state = DriverState.SEND_MSG; 268 break; 269 case RESUME: 270 state = DriverState.CLOSE; 271 break; 272 default: 273 LOG.log(Level.WARNING, "Unexpected state at TaskRuntime: {0}", state); 274 throw new DriverSideFailure("Unexpected state: " + state); 275 } 276 // After a delay, send message or suspend the task: 277 clock.scheduleAlarm(MSG_DELAY, new AlarmHandler()); 278 } 279 } 280 281 final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { 282 @Override 283 public void onNext(final SuspendedTask task) { 284 checkMsgOrder(task); 285 state = DriverState.RESUME; 286 try { 287 task.getActiveContext().submitTask(TaskConfiguration.CONF 288 .set(TaskConfiguration.IDENTIFIER, task.getId() + "_RESUMED") 289 .set(TaskConfiguration.TASK, NoopTask.class) 290 .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class) 291 .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class) 292 .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class) 293 .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class) 294 .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class) 295 .set(TaskConfiguration.MEMENTO, DatatypeConverter.printBase64Binary(HELLO_STR)) 296 .build()); 297 } catch (final BindException ex) { 298 LOG.log(Level.SEVERE, "Task configuration error", ex); 299 throw new DriverSideFailure("Task configuration error", ex); 300 } 301 } 302 } 303 304 final class TaskMessageHandler implements EventHandler<TaskMessage> { 305 @Override 306 public void onNext(final TaskMessage msg) { 307 checkMsgOrder(msg); 308 assert (Arrays.equals(HELLO_STR, msg.get())); 309 assert (state == DriverState.SEND_MSG); 310 state = DriverState.SUSPEND; 311 clock.scheduleAlarm(MSG_DELAY, new AlarmHandler()); 312 } 313 } 314 315 final class FailedTaskHandler implements EventHandler<FailedTask> { 316 @Override 317 public void onNext(final FailedTask task) { 318 LOG.log(Level.WARNING, "Task failed: " + task.getId(), task.getReason().orElse(null)); 319 checkMsgOrder(task); 320 if (task.getActiveContext().isPresent()) { 321 task.getActiveContext().get().close(); 322 } 323 } 324 } 325 326 final class CompletedTaskHandler implements EventHandler<CompletedTask> { 327 @Override 328 public void onNext(final CompletedTask task) { 329 checkMsgOrder(task); 330 task.getActiveContext().close(); 331 } 332 } 333 334 final class StartHandler implements EventHandler<StartTime> { 335 @Override 336 public void onNext(final StartTime time) { 337 FailDriver.this.checkMsgOrder(time); 338 FailDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 339 .setNumber(1).setMemory(128).setNumberOfCores(1).build()); 340 } 341 } 342 343 final class AlarmHandler implements EventHandler<Alarm> { 344 @Override 345 public void onNext(final Alarm time) { 346 FailDriver.this.checkMsgOrder(time); 347 switch (FailDriver.this.state) { 348 case SEND_MSG: 349 FailDriver.this.task.send(HELLO_STR); 350 break; 351 case SUSPEND: 352 FailDriver.this.task.suspend(); 353 break; 354 case CLOSE: 355 FailDriver.this.task.close(); 356 break; 357 default: 358 LOG.log(Level.WARNING, "Unexpected state at AlarmHandler: {0}", FailDriver.this.state); 359 throw new DriverSideFailure("Unexpected state: " + FailDriver.this.state); 360 } 361 } 362 } 363 364 final class StopHandler implements EventHandler<StopTime> { 365 @Override 366 public void onNext(final StopTime time) { 367 FailDriver.this.checkMsgOrder(time); 368 // noop 369 } 370 } 371}