This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.evaluator.AllocatedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
026import org.apache.reef.driver.evaluator.EvaluatorRequestor;
027import org.apache.reef.driver.task.*;
028import org.apache.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration;
029import org.apache.reef.tang.Configuration;
030import org.apache.reef.tang.JavaConfigurationBuilder;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.annotations.Parameter;
033import org.apache.reef.tang.annotations.Unit;
034import org.apache.reef.tang.exceptions.BindException;
035import org.apache.reef.wake.EventHandler;
036import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
037import org.apache.reef.wake.time.event.StartTime;
038import org.apache.reef.wake.time.event.StopTime;
039
040import javax.inject.Inject;
041import javax.xml.bind.DatatypeConverter;
042import java.util.Collections;
043import java.util.HashMap;
044import java.util.Map;
045import java.util.logging.Level;
046import java.util.logging.Logger;
047
048/**
049 * Suspend/resume example job driver. Execute a simple task in all evaluators,
050 * and send EvaluatorControlMessage suspend/resume events properly.
051 */
052@Unit
053public class SuspendDriver {
054
055  /**
056   * Standard Java logger.
057   */
058  private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName());
059
060  /**
061   * Number of evaluators to request.
062   */
063  private static final int NUM_EVALUATORS = 2;
064
065  /**
066   * String codec is used to encode the results driver sends to the client.
067   */
068  private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>();
069
070  /**
071   * Integer codec is used to decode the results driver gets from the tasks.
072   */
073  private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>();
074
075  /**
076   * Job observer on the client.
077   * We use it to send results from the driver back to the client.
078   */
079  private final JobMessageObserver jobMessageObserver;
080
081  /**
082   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
083   */
084  private final EvaluatorRequestor evaluatorRequestor;
085
086  /**
087   * TANG Configuration of the Task.
088   */
089  private final Configuration contextConfig;
090
091  /**
092   * Map from task ID (a string) to the TaskRuntime instance (that can be suspended).
093   */
094  private final Map<String, RunningTask> runningTasks =
095      Collections.synchronizedMap(new HashMap<String, RunningTask>());
096
097  /**
098   * Map from task ID (a string) to the SuspendedTask instance (that can be resumed).
099   */
100  private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>();
101
102  /**
103   * Job driver constructor.
104   * All parameters are injected from TANG automatically.
105   *
106   * @param evaluatorRequestor is used to request Evaluators.
107   * @param numCycles          number of cycles to run in the task.
108   * @param delay              delay in seconds between cycles in the task.
109   */
110  @Inject
111  SuspendDriver(
112      final JobMessageObserver jobMessageObserver,
113      final EvaluatorRequestor evaluatorRequestor,
114      @Parameter(Launch.Local.class) final boolean isLocal,
115      @Parameter(Launch.NumCycles.class) final int numCycles,
116      @Parameter(Launch.Delay.class) final int delay) {
117
118    this.jobMessageObserver = jobMessageObserver;
119    this.evaluatorRequestor = evaluatorRequestor;
120
121    try {
122
123      final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF
124          .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal))
125          .set(FSCheckPointServiceConfiguration.PATH, "/tmp")
126          .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-")
127          .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3")
128          .build();
129
130      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
131          .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
132          .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
133
134      cb.addConfiguration(checkpointServiceConfig);
135      this.contextConfig = cb.build();
136
137    } catch (final BindException ex) {
138      throw new RuntimeException(ex);
139    }
140  }
141
142  /**
143   * Receive notification that the Task is ready to run.
144   */
145  final class RunningTaskHandler implements EventHandler<RunningTask> {
146    @Override
147    public void onNext(final RunningTask task) {
148      LOG.log(Level.INFO, "Running task: {0}", task.getId());
149      runningTasks.put(task.getId(), task);
150      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId()));
151    }
152  }
153
154  /**
155   * Receive notification that the Task has completed successfully.
156   */
157  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
158    @Override
159    public void onNext(final CompletedTask task) {
160
161      final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor();
162      final String msg = "Task completed " + task.getId() + " on node " + e;
163      LOG.info(msg);
164
165      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
166      runningTasks.remove(task.getId());
167      task.getActiveContext().close();
168
169      final boolean noTasks;
170
171      synchronized (suspendedTasks) {
172        LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[]{
173            runningTasks.size(), suspendedTasks.size()});
174        noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty();
175      }
176
177      if (noTasks) {
178        LOG.info("All tasks completed; shutting down.");
179      }
180    }
181  }
182
183  /**
184   * Receive notification that the Task has been suspended.
185   */
186  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
187    @Override
188    public void onNext(final SuspendedTask task) {
189
190      final String msg = "Task suspended: " + task.getId();
191      LOG.info(msg);
192
193      synchronized (suspendedTasks) {
194        suspendedTasks.put(task.getId(), task);
195        runningTasks.remove(task.getId());
196      }
197
198      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
199    }
200  }
201
202  /**
203   * Receive message from the Task.
204   */
205  final class TaskMessageHandler implements EventHandler<TaskMessage> {
206    @Override
207    public void onNext(final TaskMessage message) {
208      final int result = CODEC_INT.decode(message.get());
209      final String msg = "Task message " + message.getId() + ": " + result;
210      LOG.info(msg);
211      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
212    }
213  }
214
215  /**
216   * Receive notification that an Evaluator had been allocated,
217   * and submitTask a new Task in that Evaluator.
218   */
219  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
220    @Override
221    public void onNext(final AllocatedEvaluator eval) {
222      try {
223
224        LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId());
225
226        final Configuration thisContextConfiguration = ContextConfiguration.CONF.set(
227            ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build();
228
229        eval.submitContext(Tang.Factory.getTang()
230            .newConfigurationBuilder(thisContextConfiguration, contextConfig).build());
231
232      } catch (final BindException ex) {
233        throw new RuntimeException(ex);
234      }
235    }
236  }
237
238  /**
239   * Receive notification that a new Context is available.
240   * Submit a new Task to that Context.
241   */
242  final class ActiveContextHandler implements EventHandler<ActiveContext> {
243    @Override
244    public synchronized void onNext(final ActiveContext context) {
245      LOG.log(Level.INFO, "Active Context: {0}", context.getId());
246      try {
247        context.submitTask(TaskConfiguration.CONF
248            .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
249            .set(TaskConfiguration.TASK, SuspendTestTask.class)
250            .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class)
251            .build());
252      } catch (final BindException ex) {
253        LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
254        throw new RuntimeException(ex);
255      }
256    }
257  }
258
259  /**
260   * Handle notifications from the client.
261   */
262  final class ClientMessageHandler implements EventHandler<byte[]> {
263    @Override
264    public void onNext(final byte[] message) {
265
266      final String commandStr = CODEC_STR.decode(message);
267      LOG.log(Level.INFO, "Client message: {0}", commandStr);
268
269      final String[] split = commandStr.split("\\s+", 2);
270      if (split.length != 2) {
271        throw new IllegalArgumentException("Bad command: " + commandStr);
272      } else {
273
274        final String command = split[0].toLowerCase().intern();
275        final String taskId = split[1];
276
277        switch (command) {
278
279        case "suspend": {
280          final RunningTask task = runningTasks.get(taskId);
281          if (task != null) {
282            task.suspend();
283          } else {
284            throw new IllegalArgumentException("Suspend: Task not found: " + taskId);
285          }
286          break;
287        }
288
289        case "resume": {
290          final SuspendedTask suspendedTask;
291          synchronized (suspendedTasks) {
292            suspendedTask = suspendedTasks.remove(taskId);
293          }
294          if (suspendedTask != null) {
295            try {
296              suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF
297                    .set(TaskConfiguration.IDENTIFIER, taskId)
298                    .set(TaskConfiguration.MEMENTO,
299                        DatatypeConverter.printBase64Binary(suspendedTask.get()))
300                    .build());
301            } catch (final BindException e) {
302              throw new RuntimeException(e);
303            }
304          } else {
305            throw new IllegalArgumentException("Resume: Task not found: " + taskId);
306          }
307          break;
308        }
309
310        default:
311          throw new IllegalArgumentException("Bad command: " + command);
312        }
313      }
314    }
315  }
316
317  /**
318   * Job Driver is ready and the clock is set up: request the evaluators.
319   */
320  final class StartHandler implements EventHandler<StartTime> {
321    @Override
322    public void onNext(final StartTime time) {
323      LOG.log(Level.INFO, "StartTime: {0}", time);
324      evaluatorRequestor.newRequest()
325          .setMemory(128)
326          .setNumberOfCores(1)
327          .setNumber(NUM_EVALUATORS)
328          .submit();
329    }
330  }
331
332  /**
333   * Shutting down the job driver: close the evaluators.
334   */
335  final class StopHandler implements EventHandler<StopTime> {
336    @Override
337    public void onNext(final StopTime time) {
338      LOG.log(Level.INFO, "StopTime: {0}", time);
339      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime"));
340    }
341  }
342}