This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.evaluator.AllocatedEvaluator;
025import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
026import org.apache.reef.driver.evaluator.EvaluatorRequest;
027import org.apache.reef.driver.evaluator.EvaluatorRequestor;
028import org.apache.reef.driver.task.*;
029import org.apache.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.JavaConfigurationBuilder;
032import org.apache.reef.tang.Tang;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.tang.annotations.Unit;
035import org.apache.reef.tang.exceptions.BindException;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
038import org.apache.reef.wake.time.event.StartTime;
039import org.apache.reef.wake.time.event.StopTime;
040
041import javax.inject.Inject;
042import javax.xml.bind.DatatypeConverter;
043import java.util.Collections;
044import java.util.HashMap;
045import java.util.Map;
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049/**
050 * Suspend/resume example job driver. Execute a simple task in all evaluators,
051 * and send EvaluatorControlMessage suspend/resume events properly.
052 */
053@Unit
054public class SuspendDriver {
055
056  /**
057   * Standard Java logger.
058   */
059  private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName());
060
061  /**
062   * Number of evaluators to request.
063   */
064  private static final int NUM_EVALUATORS = 2;
065
066  /**
067   * String codec is used to encode the results driver sends to the client.
068   */
069  private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>();
070
071  /**
072   * Integer codec is used to decode the results driver gets from the tasks.
073   */
074  private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>();
075
076  /**
077   * Job observer on the client.
078   * We use it to send results from the driver back to the client.
079   */
080  private final JobMessageObserver jobMessageObserver;
081
082  /**
083   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
084   */
085  private final EvaluatorRequestor evaluatorRequestor;
086
087  /**
088   * TANG Configuration of the Task.
089   */
090  private final Configuration contextConfig;
091
092  /**
093   * Map from task ID (a string) to the TaskRuntime instance (that can be suspended).
094   */
095  private final Map<String, RunningTask> runningTasks =
096      Collections.synchronizedMap(new HashMap<String, RunningTask>());
097
098  /**
099   * Map from task ID (a string) to the SuspendedTask instance (that can be resumed).
100   */
101  private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>();
102
103  /**
104   * Job driver constructor.
105   * All parameters are injected from TANG automatically.
106   *
107   * @param evaluatorRequestor is used to request Evaluators.
108   * @param numCycles          number of cycles to run in the task.
109   * @param delay              delay in seconds between cycles in the task.
110   */
111  @Inject
112  SuspendDriver(
113      final JobMessageObserver jobMessageObserver,
114      final EvaluatorRequestor evaluatorRequestor,
115      @Parameter(Launch.Local.class) final boolean isLocal,
116      @Parameter(Launch.NumCycles.class) final int numCycles,
117      @Parameter(Launch.Delay.class) final int delay) {
118
119    this.jobMessageObserver = jobMessageObserver;
120    this.evaluatorRequestor = evaluatorRequestor;
121
122    try {
123
124      final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF
125          .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal))
126          .set(FSCheckPointServiceConfiguration.PATH, "/tmp")
127          .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-")
128          .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3")
129          .build();
130
131      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
132          .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
133          .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
134
135      cb.addConfiguration(checkpointServiceConfig);
136      this.contextConfig = cb.build();
137
138    } catch (final BindException ex) {
139      throw new RuntimeException(ex);
140    }
141  }
142
143  /**
144   * Receive notification that the Task is ready to run.
145   */
146  final class RunningTaskHandler implements EventHandler<RunningTask> {
147    @Override
148    public void onNext(final RunningTask task) {
149      LOG.log(Level.INFO, "Running task: {0}", task.getId());
150      runningTasks.put(task.getId(), task);
151      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId()));
152    }
153  }
154
155  /**
156   * Receive notification that the Task has completed successfully.
157   */
158  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
159    @Override
160    public void onNext(final CompletedTask task) {
161
162      final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor();
163      final String msg = "Task completed " + task.getId() + " on node " + e;
164      LOG.info(msg);
165
166      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
167      runningTasks.remove(task.getId());
168      task.getActiveContext().close();
169
170      final boolean noTasks;
171
172      synchronized (suspendedTasks) {
173        LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[]{
174            runningTasks.size(), suspendedTasks.size()});
175        noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty();
176      }
177
178      if (noTasks) {
179        LOG.info("All tasks completed; shutting down.");
180      }
181    }
182  }
183
184  /**
185   * Receive notification that the Task has been suspended.
186   */
187  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
188    @Override
189    public void onNext(final SuspendedTask task) {
190
191      final String msg = "Task suspended: " + task.getId();
192      LOG.info(msg);
193
194      synchronized (suspendedTasks) {
195        suspendedTasks.put(task.getId(), task);
196        runningTasks.remove(task.getId());
197      }
198
199      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
200    }
201  }
202
203  /**
204   * Receive message from the Task.
205   */
206  final class TaskMessageHandler implements EventHandler<TaskMessage> {
207    @Override
208    public void onNext(final TaskMessage message) {
209      final int result = CODEC_INT.decode(message.get());
210      final String msg = "Task message " + message.getId() + ": " + result;
211      LOG.info(msg);
212      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
213    }
214  }
215
216  /**
217   * Receive notification that an Evaluator had been allocated,
218   * and submitTask a new Task in that Evaluator.
219   */
220  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
221    @Override
222    public void onNext(final AllocatedEvaluator eval) {
223      try {
224
225        LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId());
226
227        final Configuration thisContextConfiguration = ContextConfiguration.CONF.set(
228            ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build();
229
230        eval.submitContext(Tang.Factory.getTang()
231            .newConfigurationBuilder(thisContextConfiguration, contextConfig).build());
232
233      } catch (final BindException ex) {
234        throw new RuntimeException(ex);
235      }
236    }
237  }
238
239  /**
240   * Receive notification that a new Context is available.
241   * Submit a new Task to that Context.
242   */
243  final class ActiveContextHandler implements EventHandler<ActiveContext> {
244    @Override
245    public synchronized void onNext(final ActiveContext context) {
246      LOG.log(Level.INFO, "Active Context: {0}", context.getId());
247      try {
248        context.submitTask(TaskConfiguration.CONF
249            .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
250            .set(TaskConfiguration.TASK, SuspendTestTask.class)
251            .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class)
252            .build());
253      } catch (final BindException ex) {
254        LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
255        throw new RuntimeException(ex);
256      }
257    }
258  }
259
260  /**
261   * Handle notifications from the client.
262   */
263  final class ClientMessageHandler implements EventHandler<byte[]> {
264    @Override
265    public void onNext(final byte[] message) {
266
267      final String commandStr = CODEC_STR.decode(message);
268      LOG.log(Level.INFO, "Client message: {0}", commandStr);
269
270      final String[] split = commandStr.split("\\s+", 2);
271      if (split.length != 2) {
272        throw new IllegalArgumentException("Bad command: " + commandStr);
273      } else {
274
275        final String command = split[0].toLowerCase().intern();
276        final String taskId = split[1];
277
278        switch (command) {
279
280        case "suspend": {
281          final RunningTask task = runningTasks.get(taskId);
282          if (task != null) {
283            task.suspend();
284          } else {
285            throw new IllegalArgumentException("Suspend: Task not found: " + taskId);
286          }
287          break;
288        }
289
290        case "resume": {
291          final SuspendedTask suspendedTask;
292          synchronized (suspendedTasks) {
293            suspendedTask = suspendedTasks.remove(taskId);
294          }
295          if (suspendedTask != null) {
296            try {
297              suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF
298                    .set(TaskConfiguration.IDENTIFIER, taskId)
299                    .set(TaskConfiguration.MEMENTO,
300                        DatatypeConverter.printBase64Binary(suspendedTask.get()))
301                    .build());
302            } catch (final BindException e) {
303              throw new RuntimeException(e);
304            }
305          } else {
306            throw new IllegalArgumentException("Resume: Task not found: " + taskId);
307          }
308          break;
309        }
310
311        default:
312          throw new IllegalArgumentException("Bad command: " + command);
313        }
314      }
315    }
316  }
317
318  /**
319   * Job Driver is ready and the clock is set up: request the evaluators.
320   */
321  final class StartHandler implements EventHandler<StartTime> {
322    @Override
323    public void onNext(final StartTime time) {
324      LOG.log(Level.INFO, "StartTime: {0}", time);
325      evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
326          .setMemory(128).setNumberOfCores(1).setNumber(NUM_EVALUATORS).build());
327    }
328  }
329
330  /**
331   * Shutting down the job driver: close the evaluators.
332   */
333  final class StopHandler implements EventHandler<StopTime> {
334    @Override
335    public void onNext(final StopTime time) {
336      LOG.log(Level.INFO, "StopTime: {0}", time);
337      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime"));
338    }
339  }
340}