This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.fail.task;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.RunningTask;
027import org.apache.reef.driver.task.TaskConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.annotations.Name;
030import org.apache.reef.tang.annotations.NamedParameter;
031import org.apache.reef.tang.annotations.Parameter;
032import org.apache.reef.tang.annotations.Unit;
033import org.apache.reef.tang.exceptions.BindException;
034import org.apache.reef.tang.formats.ConfigurationModule;
035import org.apache.reef.tests.library.exceptions.DriverSideFailure;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.time.event.StartTime;
038
039import javax.inject.Inject;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043@Unit
044public final class Driver {
045
046  private static final Logger LOG = Logger.getLogger(Driver.class.getName());
047  private final transient String failTaskName;
048  private final transient EvaluatorRequestor requestor;
049  private transient String taskId;
050
051  @Inject
052  public Driver(final @Parameter(FailTaskName.class) String failTaskName,
053                final EvaluatorRequestor requestor) {
054    this.failTaskName = failTaskName;
055    this.requestor = requestor;
056  }
057
058  /**
059   * Name of the message class to specify the failing message handler.
060   */
061  @NamedParameter(doc = "Full name of the (failing) task class", short_name = "task")
062  public static final class FailTaskName implements Name<String> {
063  }
064
065  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
066    @Override
067    public void onNext(final AllocatedEvaluator eval) {
068
069      try {
070
071        taskId = failTaskName + "_" + eval.getId();
072        LOG.log(Level.INFO, "Submit task: {0}", taskId);
073
074        final Configuration contextConfig =
075            ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, taskId).build();
076
077        ConfigurationModule taskConfig =
078            TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, taskId);
079
080        switch (failTaskName) {
081          case "FailTask":
082            taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTask.class);
083            break;
084          case "FailTaskCall":
085            taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTaskCall.class);
086            break;
087          case "FailTaskMsg":
088            taskConfig = taskConfig
089                .set(TaskConfiguration.TASK, FailTaskMsg.class)
090                .set(TaskConfiguration.ON_MESSAGE, FailTaskMsg.class);
091            break;
092          case "FailTaskSuspend":
093            taskConfig = taskConfig
094                .set(TaskConfiguration.TASK, FailTaskSuspend.class)
095                .set(TaskConfiguration.ON_SUSPEND, FailTaskSuspend.class);
096            break;
097          case "FailTaskStart":
098            taskConfig = taskConfig
099                .set(TaskConfiguration.TASK, FailTaskStart.class)
100                .set(TaskConfiguration.ON_TASK_STARTED, FailTaskStart.class);
101            break;
102          case "FailTaskStop":
103            taskConfig = taskConfig
104                .set(TaskConfiguration.TASK, FailTaskStop.class)
105                .set(TaskConfiguration.ON_TASK_STOP, FailTaskStop.class)
106                .set(TaskConfiguration.ON_CLOSE, FailTaskStop.CloseEventHandler.class);
107            break;
108          case "FailTaskClose":
109            taskConfig = taskConfig
110                .set(TaskConfiguration.TASK, FailTaskClose.class)
111                .set(TaskConfiguration.ON_CLOSE, FailTaskClose.class);
112            break;
113        }
114
115        eval.submitContextAndTask(contextConfig, taskConfig.build());
116
117      } catch (final BindException ex) {
118        LOG.log(Level.WARNING, "Configuration error", ex);
119        throw new DriverSideFailure("Configuration error", ex);
120      }
121    }
122  }
123
124  final class RunningTaskHandler implements EventHandler<RunningTask> {
125    @Override
126    public void onNext(final RunningTask task) {
127
128      LOG.log(Level.INFO, "TaskRuntime: {0} expect {1}",
129          new Object[]{task.getId(), taskId});
130
131      if (!taskId.equals(task.getId())) {
132        throw new DriverSideFailure("Task ID " + task.getId()
133            + " not equal expected ID " + taskId);
134      }
135
136      switch (failTaskName) {
137        case "FailTaskMsg":
138          LOG.log(Level.INFO, "TaskRuntime: Send message: {0}", task);
139          task.send(new byte[0]);
140          break;
141        case "FailTaskSuspend":
142          LOG.log(Level.INFO, "TaskRuntime: Suspend: {0}", task);
143          task.suspend();
144          break;
145        case "FailTaskStop":
146        case "FailTaskClose":
147          LOG.log(Level.INFO, "TaskRuntime: Stop/Close: {0}", task);
148          task.close();
149          break;
150      }
151    }
152  }
153
154  final class ActiveContextHandler implements EventHandler<ActiveContext> {
155    @Override
156    public void onNext(final ActiveContext context) throws DriverSideFailure {
157      throw new DriverSideFailure("Unexpected ActiveContext message: " + context.getId());
158    }
159  }
160
161  final class StartHandler implements EventHandler<StartTime> {
162    @Override
163    public void onNext(final StartTime time) {
164      LOG.log(Level.INFO, "StartTime: {0}", time);
165      Driver.this.requestor.submit(EvaluatorRequest.newBuilder()
166          .setNumber(1).setMemory(128).setNumberOfCores(1).build());
167    }
168  }
169}