001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.fail.task;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.driver.task.RunningTask;
027import org.apache.reef.driver.task.TaskConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.annotations.Name;
030import org.apache.reef.tang.annotations.NamedParameter;
031import org.apache.reef.tang.annotations.Parameter;
032import org.apache.reef.tang.annotations.Unit;
033import org.apache.reef.tang.exceptions.BindException;
034import org.apache.reef.tang.formats.ConfigurationModule;
035import org.apache.reef.tests.library.exceptions.DriverSideFailure;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.time.event.StartTime;
038
039import javax.inject.Inject;
040import java.util.logging.Level;
041import java.util.logging.Logger;
042
043/**
044 * Universal driver for the test REEF job that fails on different stages of execution.
045 */
046@Unit
047public final class Driver {
048
049  private static final Logger LOG = Logger.getLogger(Driver.class.getName());
050  private final transient String failTaskName;
051  private final transient EvaluatorRequestor requestor;
052  private transient String taskId;
053
054  @Inject
055  public Driver(@Parameter(FailTaskName.class) final String failTaskName,
056                final EvaluatorRequestor requestor) {
057    this.failTaskName = failTaskName;
058    this.requestor = requestor;
059  }
060
061  /**
062   * Name of the message class to specify the failing message handler.
063   */
064  @NamedParameter(doc = "Full name of the (failing) task class", short_name = "task")
065  public static final class FailTaskName implements Name<String> {
066  }
067
068  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
069    @Override
070    public void onNext(final AllocatedEvaluator eval) {
071
072      try {
073
074        taskId = failTaskName + "_" + eval.getId();
075        LOG.log(Level.INFO, "Submit task: {0}", taskId);
076
077        final Configuration contextConfig =
078            ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, taskId).build();
079
080        ConfigurationModule taskConfig =
081            TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, taskId);
082
083        switch (failTaskName) {
084        case "FailTask":
085          taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTask.class);
086          break;
087        case "FailTaskCall":
088          taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTaskCall.class);
089          break;
090        case "FailTaskMsg":
091          taskConfig = taskConfig
092                .set(TaskConfiguration.TASK, FailTaskMsg.class)
093                .set(TaskConfiguration.ON_MESSAGE, FailTaskMsg.class);
094          break;
095        case "FailTaskSuspend":
096          taskConfig = taskConfig
097                .set(TaskConfiguration.TASK, FailTaskSuspend.class)
098                .set(TaskConfiguration.ON_SUSPEND, FailTaskSuspend.class);
099          break;
100        case "FailTaskStart":
101          taskConfig = taskConfig
102                .set(TaskConfiguration.TASK, FailTaskStart.class)
103                .set(TaskConfiguration.ON_TASK_STARTED, FailTaskStart.class);
104          break;
105        case "FailTaskStop":
106          taskConfig = taskConfig
107                .set(TaskConfiguration.TASK, FailTaskStop.class)
108                .set(TaskConfiguration.ON_TASK_STOP, FailTaskStop.class)
109                .set(TaskConfiguration.ON_CLOSE, FailTaskStop.CloseEventHandler.class);
110          break;
111        case "FailTaskClose":
112          taskConfig = taskConfig
113                .set(TaskConfiguration.TASK, FailTaskClose.class)
114                .set(TaskConfiguration.ON_CLOSE, FailTaskClose.class);
115          break;
116        default:
117          break;
118        }
119
120        eval.submitContextAndTask(contextConfig, taskConfig.build());
121
122      } catch (final BindException ex) {
123        LOG.log(Level.WARNING, "Configuration error", ex);
124        throw new DriverSideFailure("Configuration error", ex);
125      }
126    }
127  }
128
129  final class RunningTaskHandler implements EventHandler<RunningTask> {
130    @Override
131    public void onNext(final RunningTask task) {
132
133      LOG.log(Level.INFO, "TaskRuntime: {0} expect {1}",
134          new Object[]{task.getId(), taskId});
135
136      if (!taskId.equals(task.getId())) {
137        throw new DriverSideFailure("Task ID " + task.getId()
138            + " not equal expected ID " + taskId);
139      }
140
141      switch (failTaskName) {
142      case "FailTaskMsg":
143        LOG.log(Level.INFO, "TaskRuntime: Send message: {0}", task);
144        task.send(new byte[0]);
145        break;
146      case "FailTaskSuspend":
147        LOG.log(Level.INFO, "TaskRuntime: Suspend: {0}", task);
148        task.suspend();
149        break;
150      case "FailTaskStop":
151      case "FailTaskClose":
152        LOG.log(Level.INFO, "TaskRuntime: Stop/Close: {0}", task);
153        task.close();
154        break;
155      default:
156        break;
157      }
158    }
159  }
160
161  final class ActiveContextHandler implements EventHandler<ActiveContext> {
162    @Override
163    public void onNext(final ActiveContext context) throws DriverSideFailure {
164      throw new DriverSideFailure("Unexpected ActiveContext message: " + context.getId());
165    }
166  }
167
168  final class StartHandler implements EventHandler<StartTime> {
169    @Override
170    public void onNext(final StartTime time) {
171      LOG.log(Level.INFO, "StartTime: {0}", time);
172      Driver.this.requestor.submit(EvaluatorRequest.newBuilder()
173          .setNumber(1).setMemory(128).setNumberOfCores(1).build());
174    }
175  }
176}