This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.fail.driver;
020
021import org.apache.reef.driver.context.*;
022import org.apache.reef.driver.evaluator.*;
023import org.apache.reef.driver.task.*;
024import org.apache.reef.tang.annotations.Name;
025import org.apache.reef.tang.annotations.NamedParameter;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.tang.annotations.Unit;
028import org.apache.reef.tang.exceptions.BindException;
029import org.apache.reef.tests.library.exceptions.DriverSideFailure;
030import org.apache.reef.tests.library.exceptions.SimulatedDriverFailure;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
033import org.apache.reef.wake.time.Clock;
034import org.apache.reef.wake.time.event.Alarm;
035import org.apache.reef.wake.time.event.StartTime;
036import org.apache.reef.wake.time.event.StopTime;
037
038import javax.inject.Inject;
039import javax.xml.bind.DatatypeConverter;
040import java.util.Arrays;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044import static org.apache.reef.tests.fail.driver.FailDriver.ExpectedMessage.RequiredFlag.OPTIONAL;
045import static org.apache.reef.tests.fail.driver.FailDriver.ExpectedMessage.RequiredFlag.REQUIRED;
046
047@Unit
048public final class FailDriver {
049
050  private static final Logger LOG = Logger.getLogger(FailDriver.class.getName());
051  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
052  private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO");
053  /**
054   * Send message to the Task MSG_DELAY milliseconds after start.
055   */
056  private static final int MSG_DELAY = 1000;
057  private static final ExpectedMessage[] EVENT_SEQUENCE = {
058      new ExpectedMessage(FailDriver.class, REQUIRED),
059      new ExpectedMessage(StartTime.class, REQUIRED),
060      new ExpectedMessage(AllocatedEvaluator.class, REQUIRED),
061      new ExpectedMessage(FailedEvaluator.class, OPTIONAL),
062      new ExpectedMessage(ActiveContext.class, REQUIRED),
063      new ExpectedMessage(ContextMessage.class, OPTIONAL),
064      new ExpectedMessage(FailedContext.class, OPTIONAL),
065      new ExpectedMessage(RunningTask.class, REQUIRED),
066      new ExpectedMessage(Alarm.class, REQUIRED),
067      new ExpectedMessage(TaskMessage.class, REQUIRED),
068      new ExpectedMessage(Alarm.class, REQUIRED),
069      new ExpectedMessage(SuspendedTask.class, REQUIRED),
070      new ExpectedMessage(RunningTask.class, REQUIRED),
071      new ExpectedMessage(Alarm.class, REQUIRED),
072      new ExpectedMessage(FailedTask.class, OPTIONAL),
073      new ExpectedMessage(CompletedTask.class, REQUIRED),
074      new ExpectedMessage(ClosedContext.class, OPTIONAL),
075      new ExpectedMessage(CompletedEvaluator.class, REQUIRED),
076      new ExpectedMessage(StopTime.class, REQUIRED)
077  };
078  private final transient Class<?> failMsgClass;
079  private final transient EvaluatorRequestor requestor;
080  private final transient Clock clock;
081  private transient RunningTask task = null;
082  private transient int expectIdx = 0;
083  private transient DriverState state = DriverState.INIT;
084
085  @Inject
086  public FailDriver(final @Parameter(FailMsgClassName.class) String failMsgClassName,
087                    final EvaluatorRequestor requestor, final Clock clock)
088      throws ClassNotFoundException {
089    this.failMsgClass = ClassLoader.getSystemClassLoader().loadClass(failMsgClassName);
090    this.requestor = requestor;
091    this.clock = clock;
092    this.checkMsgOrder(this);
093  }
094
095  /**
096   * Check if observer methods are called in the right order
097   * and generate an exception at the given point in the message sequence.
098   *
099   * @param msg a message from one of the observers.
100   * @throws SimulatedDriverFailure if failMsgClass matches the message class.
101   * @throws DriverSideFailure      if messages are out of order.
102   */
103  private void checkMsgOrder(final Object msg) throws SimulatedDriverFailure, DriverSideFailure {
104
105    final String msgClassName = msg.getClass().getName();
106    LOG.log(Level.FINE, "At {0} {1}:{2}", new Object[]{
107        this.state, this.expectIdx, msgClassName});
108
109    if (this.state == DriverState.FAILED) {
110      // If already failed, do not do anything
111      return;
112    }
113
114    // Simulate failure at this step?
115    if (this.failMsgClass.isInstance(msg)) {
116      this.state = DriverState.FAILED;
117    }
118
119    // Make sure events arrive in the right order (specified in EVENT_SEQUENCE):
120    boolean notFound = true;
121    for (; this.expectIdx < EVENT_SEQUENCE.length; ++this.expectIdx) {
122      if (EVENT_SEQUENCE[expectIdx].msgClass.isInstance(msg)) {
123        notFound = false;
124        break;
125      } else if (EVENT_SEQUENCE[expectIdx].requiredFlag == REQUIRED) {
126        break;
127      }
128    }
129
130    if (notFound) {
131      LOG.log(Level.SEVERE, "Event out of sequence: {0} {1}:{2}",
132          new Object[]{this.state, this.expectIdx, msgClassName});
133      throw new DriverSideFailure("Event out of sequence: " + msgClassName);
134    }
135
136    LOG.log(Level.INFO, "{0}: send: {1} got: {2}", new Object[]{
137        this.state, EVENT_SEQUENCE[this.expectIdx], msgClassName});
138
139    ++this.expectIdx;
140
141    if (this.state == DriverState.FAILED) {
142      final SimulatedDriverFailure ex = new SimulatedDriverFailure(
143          "Simulated Failure at FailDriver :: " + msgClassName);
144      LOG.log(Level.INFO, "Simulated Failure: {0}", ex);
145      throw ex;
146    }
147  }
148
149  private enum DriverState {INIT, SEND_MSG, SUSPEND, RESUME, CLOSE, FAILED}
150
151  /**
152   * Name of the message class to specify the failing message handler.
153   */
154  @NamedParameter(doc = "Full name of the message class to fail on", short_name = "fail")
155  public static final class FailMsgClassName implements Name<String> {
156  }
157
158  public static final class ExpectedMessage {
159
160    public final transient Class<?> msgClass;
161    public final transient RequiredFlag requiredFlag;
162    private final transient String repr;
163
164    public ExpectedMessage(final Class<?> clazz, final RequiredFlag requiredFlag) {
165      this.msgClass = clazz;
166      this.requiredFlag = requiredFlag;
167      this.repr = this.msgClass.getSimpleName() + ":" + this.requiredFlag;
168    }
169
170    @Override
171    public String toString() {
172      return this.repr;
173    }
174
175    public enum RequiredFlag {OPTIONAL, REQUIRED}
176  }
177
178  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
179    @Override
180    public void onNext(final AllocatedEvaluator eval) {
181      checkMsgOrder(eval);
182      try {
183        eval.submitContext(ContextConfiguration.CONF
184            .set(ContextConfiguration.IDENTIFIER, "FailContext_" + eval.getId())
185            .build());
186      } catch (final BindException ex) {
187        LOG.log(Level.WARNING, "Context configuration error", ex);
188        throw new RuntimeException(ex);
189      }
190    }
191  }
192
193  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
194    @Override
195    public void onNext(final CompletedEvaluator eval) {
196      checkMsgOrder(eval);
197      // noop
198    }
199  }
200
201  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
202    @Override
203    public void onNext(final FailedEvaluator eval) {
204      LOG.log(Level.WARNING, "Evaluator failed: " + eval.getId(), eval.getEvaluatorException());
205      checkMsgOrder(eval);
206      throw new RuntimeException(eval.getEvaluatorException());
207    }
208  }
209
210  final class ActiveContextHandler implements EventHandler<ActiveContext> {
211    @Override
212    public void onNext(final ActiveContext context) {
213      checkMsgOrder(context);
214      try {
215        context.submitTask(TaskConfiguration.CONF
216            .set(TaskConfiguration.IDENTIFIER, "FailTask_" + context.getId())
217            .set(TaskConfiguration.TASK, NoopTask.class)
218            .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class)
219            .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class)
220            .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class)
221            .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class)
222            .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class)
223            .build());
224      } catch (final BindException ex) {
225        LOG.log(Level.WARNING, "Task configuration error", ex);
226        throw new RuntimeException(ex);
227      }
228    }
229  }
230
231  final class ContextMessageHandler implements EventHandler<ContextMessage> {
232    @Override
233    public void onNext(final ContextMessage message) {
234      checkMsgOrder(message);
235      // noop
236    }
237  }
238
239  final class ClosedContextHandler implements EventHandler<ClosedContext> {
240    @Override
241    public void onNext(final ClosedContext context) {
242      checkMsgOrder(context);
243      // noop
244    }
245  }
246
247  final class FailedContextHandler implements EventHandler<FailedContext> {
248    @Override
249    public void onNext(final FailedContext context) {
250      LOG.log(Level.WARNING, "Context failed: " + context.getId(), context.getReason().orElse(null));
251      checkMsgOrder(context);
252      // TODO: notify client?
253
254      // if (context.getParentContext().isPresent()) {
255      //   context.getParentContext().get().close();
256      // }
257    }
258  }
259
260  final class RunningTaskHandler implements EventHandler<RunningTask> {
261    @Override
262    public void onNext(final RunningTask task) {
263      checkMsgOrder(task);
264      FailDriver.this.task = task;
265      switch (state) {
266        case INIT:
267          state = DriverState.SEND_MSG;
268          break;
269        case RESUME:
270          state = DriverState.CLOSE;
271          break;
272        default:
273          LOG.log(Level.WARNING, "Unexpected state at TaskRuntime: {0}", state);
274          throw new DriverSideFailure("Unexpected state: " + state);
275      }
276      // After a delay, send message or suspend the task:
277      clock.scheduleAlarm(MSG_DELAY, new AlarmHandler());
278    }
279  }
280
281  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
282    @Override
283    public void onNext(final SuspendedTask task) {
284      checkMsgOrder(task);
285      state = DriverState.RESUME;
286      try {
287        task.getActiveContext().submitTask(TaskConfiguration.CONF
288            .set(TaskConfiguration.IDENTIFIER, task.getId() + "_RESUMED")
289            .set(TaskConfiguration.TASK, NoopTask.class)
290            .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class)
291            .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class)
292            .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class)
293            .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class)
294            .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class)
295            .set(TaskConfiguration.MEMENTO, DatatypeConverter.printBase64Binary(HELLO_STR))
296            .build());
297      } catch (final BindException ex) {
298        LOG.log(Level.SEVERE, "Task configuration error", ex);
299        throw new DriverSideFailure("Task configuration error", ex);
300      }
301    }
302  }
303
304  final class TaskMessageHandler implements EventHandler<TaskMessage> {
305    @Override
306    public void onNext(final TaskMessage msg) {
307      checkMsgOrder(msg);
308      assert (Arrays.equals(HELLO_STR, msg.get()));
309      assert (state == DriverState.SEND_MSG);
310      state = DriverState.SUSPEND;
311      clock.scheduleAlarm(MSG_DELAY, new AlarmHandler());
312    }
313  }
314
315  final class FailedTaskHandler implements EventHandler<FailedTask> {
316    @Override
317    public void onNext(final FailedTask task) {
318      LOG.log(Level.WARNING, "Task failed: " + task.getId(), task.getReason().orElse(null));
319      checkMsgOrder(task);
320      if (task.getActiveContext().isPresent()) {
321        task.getActiveContext().get().close();
322      }
323    }
324  }
325
326  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
327    @Override
328    public void onNext(final CompletedTask task) {
329      checkMsgOrder(task);
330      task.getActiveContext().close();
331    }
332  }
333
334  final class StartHandler implements EventHandler<StartTime> {
335    @Override
336    public void onNext(final StartTime time) {
337      FailDriver.this.checkMsgOrder(time);
338      FailDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
339          .setNumber(1).setMemory(128).setNumberOfCores(1).build());
340    }
341  }
342
343  final class AlarmHandler implements EventHandler<Alarm> {
344    @Override
345    public void onNext(final Alarm time) {
346      FailDriver.this.checkMsgOrder(time);
347      switch (FailDriver.this.state) {
348        case SEND_MSG:
349          FailDriver.this.task.send(HELLO_STR);
350          break;
351        case SUSPEND:
352          FailDriver.this.task.suspend();
353          break;
354        case CLOSE:
355          FailDriver.this.task.close();
356          break;
357        default:
358          LOG.log(Level.WARNING, "Unexpected state at AlarmHandler: {0}", FailDriver.this.state);
359          throw new DriverSideFailure("Unexpected state: " + FailDriver.this.state);
360      }
361    }
362  }
363
364  final class StopHandler implements EventHandler<StopTime> {
365    @Override
366    public void onNext(final StopTime time) {
367      FailDriver.this.checkMsgOrder(time);
368      // noop
369    }
370  }
371}