This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.fail.driver;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.RunningTask;
027import org.apache.reef.driver.task.TaskConfiguration;
028import org.apache.reef.driver.task.TaskMessage;
029import org.apache.reef.tang.annotations.Unit;
030import org.apache.reef.tang.exceptions.BindException;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
033import org.apache.reef.wake.time.Clock;
034import org.apache.reef.wake.time.event.Alarm;
035import org.apache.reef.wake.time.event.StartTime;
036
037import javax.inject.Inject;
038import java.util.Arrays;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042/**
043 * Driver which fails due to delayed message.
044 */
045@Unit
046public final class FailDriverDelayedMsg {
047
048  private static final Logger LOG = Logger.getLogger(FailDriverDelayedMsg.class.getName());
049  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
050  private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO");
051
052  private final transient EvaluatorRequestor requestor;
053  private final transient Clock clock;
054  private transient RunningTask task = null;
055
056  @Inject
057  public FailDriverDelayedMsg(final EvaluatorRequestor requestor, final Clock clock) {
058    LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.<init>");
059    this.requestor = requestor;
060    this.clock = clock;
061  }
062
063  /**
064   * Handler for AllocatedEvaluator.
065   */
066  public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
067    @Override
068    public void onNext(final AllocatedEvaluator eval) {
069      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(AllocatedEvaluator): {0}", eval);
070      try {
071        eval.submitContext(ContextConfiguration.CONF
072            .set(ContextConfiguration.IDENTIFIER, "Context_" + eval.getId())
073            .build());
074      } catch (final BindException ex) {
075        LOG.log(Level.WARNING, "Context configuration error", ex);
076        throw new RuntimeException(ex);
077      }
078    }
079  }
080
081  /**
082   * Handler for ActiveContext.
083   */
084  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
085    @Override
086    public void onNext(final ActiveContext context) {
087      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(ActiveContext): {0}", context);
088      try {
089        context.submitTask(TaskConfiguration.CONF
090            .set(TaskConfiguration.IDENTIFIER, "Task_" + context.getId())
091            .set(TaskConfiguration.TASK, NoopTask.class)
092            .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class)
093            .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class)
094            .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class)
095            .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class)
096            .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class)
097            .build());
098      } catch (final BindException ex) {
099        LOG.log(Level.WARNING, "Task configuration error", ex);
100        throw new RuntimeException(ex);
101      }
102    }
103  }
104
105  /**
106   * Handler for RunningTask.
107   */
108  public final class RunningTaskHandler implements EventHandler<RunningTask> {
109    @Override
110    @SuppressWarnings("checkstyle:hiddenfield")
111    public void onNext(final RunningTask task) {
112      FailDriverDelayedMsg.this.task = task;
113      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskRuntime): {0}", task);
114      FailDriverDelayedMsg.this.clock.scheduleAlarm(2000, new EventHandler<Alarm>() {
115        @Override
116        public void onNext(final Alarm time) {
117          LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(Alarm): {0}", time);
118          task.send(HELLO_STR);
119        }
120      });
121    }
122  }
123
124  /**
125   * Handler for TaskMessage.
126   */
127  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
128    @Override
129    public void onNext(final TaskMessage msg) {
130      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskMessage): {0}", msg);
131      assert Arrays.equals(HELLO_STR, msg.get());
132      FailDriverDelayedMsg.this.task.close();
133    }
134  }
135
136  /**
137   * Handler for StartTime.
138   */
139  public final class StartHandler implements EventHandler<StartTime> {
140    @Override
141    public void onNext(final StartTime time) {
142      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(StartTime): {0}", time);
143      FailDriverDelayedMsg.this.requestor.submit(EvaluatorRequest.newBuilder()
144          .setNumber(1).setMemory(128).setNumberOfCores(1).build());
145    }
146  }
147}