This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.fail.driver;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.RunningTask;
027import org.apache.reef.driver.task.TaskConfiguration;
028import org.apache.reef.driver.task.TaskMessage;
029import org.apache.reef.tang.annotations.Unit;
030import org.apache.reef.tang.exceptions.BindException;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
033import org.apache.reef.wake.time.Clock;
034import org.apache.reef.wake.time.event.Alarm;
035import org.apache.reef.wake.time.event.StartTime;
036
037import javax.inject.Inject;
038import java.util.Arrays;
039import java.util.logging.Level;
040import java.util.logging.Logger;
041
042@Unit
043public final class FailDriverDelayedMsg {
044
045  private static final Logger LOG = Logger.getLogger(FailDriverDelayedMsg.class.getName());
046  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
047  private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO");
048
049  private final transient EvaluatorRequestor requestor;
050  private final transient Clock clock;
051  private transient RunningTask task = null;
052
053  @Inject
054  public FailDriverDelayedMsg(final EvaluatorRequestor requestor, final Clock clock) {
055    LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.<init>");
056    this.requestor = requestor;
057    this.clock = clock;
058  }
059
060  public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
061    @Override
062    public void onNext(final AllocatedEvaluator eval) {
063      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(AllocatedEvaluator): {0}", eval);
064      try {
065        eval.submitContext(ContextConfiguration.CONF
066            .set(ContextConfiguration.IDENTIFIER, "Context_" + eval.getId())
067            .build());
068      } catch (final BindException ex) {
069        LOG.log(Level.WARNING, "Context configuration error", ex);
070        throw new RuntimeException(ex);
071      }
072    }
073  }
074
075  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
076    @Override
077    public void onNext(final ActiveContext context) {
078      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(ActiveContext): {0}", context);
079      try {
080        context.submitTask(TaskConfiguration.CONF
081            .set(TaskConfiguration.IDENTIFIER, "Task_" + context.getId())
082            .set(TaskConfiguration.TASK, NoopTask.class)
083            .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class)
084            .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class)
085            .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class)
086            .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class)
087            .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class)
088            .build());
089      } catch (final BindException ex) {
090        LOG.log(Level.WARNING, "Task configuration error", ex);
091        throw new RuntimeException(ex);
092      }
093    }
094  }
095
096  public final class RunningTaskHandler implements EventHandler<RunningTask> {
097    @Override
098    public void onNext(final RunningTask task) {
099      FailDriverDelayedMsg.this.task = task;
100      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskRuntime): {0}", task);
101      FailDriverDelayedMsg.this.clock.scheduleAlarm(2000, new EventHandler<Alarm>() {
102        @Override
103        public void onNext(final Alarm time) {
104          LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(Alarm): {0}", time);
105          task.send(HELLO_STR);
106        }
107      });
108    }
109  }
110
111  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
112    @Override
113    public void onNext(final TaskMessage msg) {
114      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskMessage): {0}", msg);
115      assert (Arrays.equals(HELLO_STR, msg.get()));
116      FailDriverDelayedMsg.this.task.close();
117    }
118  }
119
120  public final class StartHandler implements EventHandler<StartTime> {
121    @Override
122    public void onNext(final StartTime time) {
123      LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(StartTime): {0}", time);
124      FailDriverDelayedMsg.this.requestor.submit(EvaluatorRequest.newBuilder()
125          .setNumber(1).setMemory(128).setNumberOfCores(1).build());
126    }
127  }
128}