This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.*;
022import org.apache.reef.tang.Configuration;
023import org.apache.reef.tang.JavaConfigurationBuilder;
024import org.apache.reef.tang.Tang;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.tang.annotations.Unit;
027import org.apache.reef.tang.exceptions.BindException;
028import org.apache.reef.util.EnvironmentUtils;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.io.IOException;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036/**
037 * Client for suspend example.
038 */
039@Unit
040public class SuspendClient {
041
042  /**
043   * Standard java logger.
044   */
045  private static final Logger LOG = Logger.getLogger(SuspendClient.class.getName());
046
047  /**
048   * Job Driver configuration.
049   */
050  private final Configuration driverConfig;
051
052  /**
053   * Reference to the REEF framework.
054   */
055  private final REEF reef;
056
057  /**
058   * Controller that listens for suspend/resume commands on a specified port.
059   */
060  private final SuspendClientControl controlListener;
061
062  /**
063   * @param reef                 reference to the REEF framework.
064   * @param controlListener      suspend client control listener.
065   * @param numCycles            number of cycles to run in the task.
066   * @param delay                delay in seconds between cycles in the task.
067   */
068  @Inject
069  SuspendClient(
070      final REEF reef,
071      final SuspendClientControl controlListener,
072      @Parameter(Launch.NumCycles.class) final int numCycles,
073      @Parameter(Launch.Delay.class) final int delay) throws BindException, IOException {
074
075    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
076        .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
077        .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
078
079    cb.addConfiguration(DriverConfiguration.CONF
080        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SuspendDriver.class))
081        .set(DriverConfiguration.DRIVER_IDENTIFIER, "suspend-" + System.currentTimeMillis())
082        .set(DriverConfiguration.ON_TASK_RUNNING, SuspendDriver.RunningTaskHandler.class)
083        .set(DriverConfiguration.ON_TASK_COMPLETED, SuspendDriver.CompletedTaskHandler.class)
084        .set(DriverConfiguration.ON_TASK_SUSPENDED, SuspendDriver.SuspendedTaskHandler.class)
085        .set(DriverConfiguration.ON_TASK_MESSAGE, SuspendDriver.TaskMessageHandler.class)
086        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SuspendDriver.AllocatedEvaluatorHandler.class)
087        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SuspendDriver.ActiveContextHandler.class)
088        .set(DriverConfiguration.ON_CLIENT_MESSAGE, SuspendDriver.ClientMessageHandler.class)
089        .set(DriverConfiguration.ON_DRIVER_STARTED, SuspendDriver.StartHandler.class)
090        .set(DriverConfiguration.ON_DRIVER_STOP, SuspendDriver.StopHandler.class)
091        .build());
092
093    this.driverConfig = cb.build();
094    this.reef = reef;
095    this.controlListener = controlListener;
096  }
097
098  /**
099   * Start the job driver.
100   */
101  public void submit() {
102    LOG.info("Start the job driver");
103    this.reef.submit(this.driverConfig);
104  }
105
106  /**
107   * Wait for the job to complete.
108   */
109  public void waitForCompletion() throws Exception {
110    LOG.info("Waiting for the Job Driver to complete.");
111    try {
112      synchronized (this) {
113        this.wait();
114      }
115    } catch (final InterruptedException ex) {
116      LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
117    }
118    this.reef.close();
119    this.controlListener.close();
120  }
121
122  /**
123   * Receive notification from the driver that the job is about to run.
124   * RunningJob object is a proxy to the running job driver that can be used for sending messages.
125   */
126  final class RunningJobHandler implements EventHandler<RunningJob> {
127    @Override
128    public void onNext(final RunningJob job) {
129      LOG.log(Level.INFO, "Running job: {0}", job.getId());
130      SuspendClient.this.controlListener.setRunningJob(job);
131    }
132  }
133
134  /**
135   * Receive notification from the driver that the job had failed.
136   * <p>
137   * FailedJob is a proxy for the failed job driver
138   * (contains job ID and exception thrown from the driver).
139   */
140  final class FailedJobHandler implements EventHandler<FailedJob> {
141    @Override
142    public void onNext(final FailedJob job) {
143      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
144      synchronized (SuspendClient.this) {
145        SuspendClient.this.notify();
146      }
147    }
148  }
149
150  /**
151   * Receive notification from the driver that the job had completed successfully.
152   */
153  final class CompletedJobHandler implements EventHandler<CompletedJob> {
154    @Override
155    public void onNext(final CompletedJob job) {
156      LOG.log(Level.INFO, "Completed job: {0}", job.getId());
157      synchronized (SuspendClient.this) {
158        SuspendClient.this.notify();
159      }
160    }
161  }
162
163  /**
164   * Receive notification that there was an exception thrown from the job driver.
165   */
166  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
167    @Override
168    public void onNext(final FailedRuntime error) {
169      LOG.log(Level.SEVERE, "ERROR: " + error, error.getReason().orElse(null));
170      synchronized (SuspendClient.this) {
171        SuspendClient.this.notify();
172      }
173    }
174  }
175}