This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.watcher;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.context.FailedContext;
024import org.apache.reef.driver.evaluator.AllocatedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.driver.evaluator.EvaluatorRequestor;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.driver.task.FailedTask;
029import org.apache.reef.driver.task.RunningTask;
030import org.apache.reef.driver.task.SuspendedTask;
031import org.apache.reef.driver.task.TaskConfiguration;
032import org.apache.reef.tang.Configuration;
033import org.apache.reef.tang.Tang;
034import org.apache.reef.tang.annotations.Unit;
035import org.apache.reef.wake.EventHandler;
036import org.apache.reef.wake.time.event.StartTime;
037import org.apache.reef.wake.time.runtime.event.RuntimeStop;
038
039import javax.inject.Inject;
040import java.util.concurrent.atomic.AtomicBoolean;
041
042/**
043 * Driver for WatcherTest.
044 */
045@Unit
046public final class WatcherTestDriver {
047
048  private static final String ROOT_CONTEXT_ID = "ROOT_CONTEXT";
049  private static final String FIRST_CONTEXT_ID = "FIRST_CONTEXT";
050
051  private final EvaluatorRequestor evaluatorRequestor;
052  private final TestEventStream testEventStream;
053
054  /**
055   * The first evaluator will be failed to generate FailedEvaluator.
056   */
057  private final AtomicBoolean isFirstEvaluator;
058
059  /**
060   * The first task will be suspended to generate SuspendedTask.
061   */
062  private final AtomicBoolean isFirstTask;
063
064  @Inject
065  private WatcherTestDriver(final EvaluatorRequestor evaluatorRequestor,
066                            final TestEventStream testEventStream) {
067    this.evaluatorRequestor = evaluatorRequestor;
068    this.testEventStream = testEventStream;
069    this.isFirstEvaluator = new AtomicBoolean(true);
070    this.isFirstTask = new AtomicBoolean(true);
071  }
072
073  /**
074   * Handler for StartTime.
075   */
076  public final class DriverStartedHandler implements EventHandler<StartTime> {
077
078    @Override
079    public void onNext(final StartTime value) {
080      evaluatorRequestor.submit(EvaluatorRequest
081          .newBuilder()
082          .setMemory(64)
083          .setNumberOfCores(1)
084          .setNumber(2)
085          .build());
086    }
087  }
088
089  /**
090   * Handler for AllocatedEvaluator.
091   */
092  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
093
094    @Override
095    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
096      if (isFirstEvaluator.compareAndSet(true, false)) {
097        allocatedEvaluator.submitContext(getFailedContextConfiguration());
098      } else {
099        allocatedEvaluator.submitContext(ContextConfiguration.CONF
100            .set(ContextConfiguration.IDENTIFIER, ROOT_CONTEXT_ID)
101            .build());
102      }
103    }
104  }
105
106  /**
107   * Handler for FailedEvaluator.
108   */
109  public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
110
111    @Override
112    public void onNext(final FailedEvaluator failedEvaluator) {
113      // no-op
114    }
115  }
116
117  /**
118   * Handler for ActiveContext.
119   */
120  public final class ContextActivatedHandler implements EventHandler<ActiveContext> {
121
122    @Override
123    public void onNext(final ActiveContext activeContext) {
124      if (activeContext.getId().equals(ROOT_CONTEXT_ID)) {
125        activeContext.submitContext(ContextConfiguration.CONF
126            .set(ContextConfiguration.IDENTIFIER, FIRST_CONTEXT_ID)
127            .build());
128      } else if (activeContext.getId().equals(FIRST_CONTEXT_ID)) {
129        activeContext.submitContext(getFailedContextConfiguration());
130      }
131    }
132  }
133
134  /**
135   * Handler for FailedContext.
136   */
137  public final class ContextFailedHandler implements EventHandler<FailedContext> {
138
139    @Override
140    public void onNext(final FailedContext failedContext) {
141      failedContext.getParentContext().get().submitTask(getFailedTaskConfiguration());
142    }
143  }
144
145  /**
146   * Handler for RunningTask.
147   */
148  public final class TaskRunningHandler implements EventHandler<RunningTask> {
149
150    @Override
151    public void onNext(final RunningTask runningTask) {
152      if (isFirstTask.compareAndSet(true, false)) {
153        runningTask.suspend();
154      }
155    }
156  }
157
158  /**
159   * Handler for FailedTask.
160   */
161  public final class TaskFailedHandler implements EventHandler<FailedTask> {
162
163    @Override
164    public void onNext(final FailedTask failedTask) {
165      failedTask.getActiveContext().get().submitTask(getTaskConfiguration(true));
166    }
167  }
168
169  /**
170   * Handler for SuspendedTask.
171   */
172  public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> {
173
174    @Override
175    public void onNext(final SuspendedTask value) {
176      value.getActiveContext().submitTask(getTaskConfiguration(false));
177    }
178  }
179
180  /**
181   * Handler for RuntimeStop.
182   */
183  public final class RuntimeStopHandler implements EventHandler<RuntimeStop> {
184
185    @Override
186    public void onNext(final RuntimeStop runtimeStop) {
187      testEventStream.validate();
188    }
189  }
190
191  private Configuration getTaskConfiguration(final boolean isTaskSuspended) {
192    final Configuration taskConf = TaskConfiguration.CONF
193        .set(TaskConfiguration.TASK, WatcherTestTask.class)
194        .set(TaskConfiguration.IDENTIFIER, "TASK")
195        .set(TaskConfiguration.ON_SEND_MESSAGE, WatcherTestTask.class)
196        .set(TaskConfiguration.ON_SUSPEND, WatcherTestTask.TaskSuspendedHandler.class)
197        .build();
198
199    return Tang.Factory.getTang().newConfigurationBuilder(taskConf)
200        .bindNamedParameter(IsTaskSuspended.class, String.valueOf(isTaskSuspended))
201        .build();
202  }
203
204  private Configuration getFailedTaskConfiguration() {
205    return TaskConfiguration.CONF
206        .set(TaskConfiguration.TASK, WatcherTestTask.class)
207        .set(TaskConfiguration.IDENTIFIER, "FAILED_TASK")
208        .set(TaskConfiguration.ON_TASK_STARTED, FailedTaskStartHandler.class)
209        .build();
210  }
211
212  private Configuration getFailedContextConfiguration() {
213    return ContextConfiguration.CONF
214        .set(ContextConfiguration.IDENTIFIER, "FAILED_CONTEXT")
215        .set(ContextConfiguration.ON_CONTEXT_STARTED, FailedContextHandler.class)
216        .build();
217  }
218}