This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.*;
022import org.apache.reef.tang.Configuration;
023import org.apache.reef.tang.JavaConfigurationBuilder;
024import org.apache.reef.tang.Tang;
025import org.apache.reef.tang.annotations.Parameter;
026import org.apache.reef.tang.annotations.Unit;
027import org.apache.reef.tang.exceptions.BindException;
028import org.apache.reef.util.EnvironmentUtils;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.io.IOException;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036@Unit
037public class SuspendClient {
038
039  /**
040   * Standard java logger.
041   */
042  private final static Logger LOG = Logger.getLogger(SuspendClient.class.getName());
043
044  /**
045   * Job Driver configuration.
046   */
047  private final Configuration driverConfig;
048
049  /**
050   * Reference to the REEF framework.
051   */
052  private final REEF reef;
053
054  /**
055   * Controller that listens for suspend/resume commands on a specified port.
056   */
057  private final SuspendClientControl controlListener;
058
059  /**
060   * @param reef      reference to the REEF framework.
061   * @param port      port to listen to for suspend/resume commands.
062   * @param numCycles number of cycles to run in the task.
063   * @param delay     delay in seconds between cycles in the task.
064   */
065  @Inject
066  SuspendClient(
067      final REEF reef,
068      final @Parameter(SuspendClientControl.Port.class) int port,
069      final @Parameter(Launch.NumCycles.class) int numCycles,
070      final @Parameter(Launch.Delay.class) int delay) throws BindException, IOException {
071
072    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
073        .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
074        .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
075
076    cb.addConfiguration(DriverConfiguration.CONF
077        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SuspendDriver.class))
078        .set(DriverConfiguration.DRIVER_IDENTIFIER, "suspend-" + System.currentTimeMillis())
079        .set(DriverConfiguration.ON_TASK_RUNNING, SuspendDriver.RunningTaskHandler.class)
080        .set(DriverConfiguration.ON_TASK_COMPLETED, SuspendDriver.CompletedTaskHandler.class)
081        .set(DriverConfiguration.ON_TASK_SUSPENDED, SuspendDriver.SuspendedTaskHandler.class)
082        .set(DriverConfiguration.ON_TASK_MESSAGE, SuspendDriver.TaskMessageHandler.class)
083        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SuspendDriver.AllocatedEvaluatorHandler.class)
084        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SuspendDriver.ActiveContextHandler.class)
085        .set(DriverConfiguration.ON_CLIENT_MESSAGE, SuspendDriver.ClientMessageHandler.class)
086        .set(DriverConfiguration.ON_DRIVER_STARTED, SuspendDriver.StartHandler.class)
087        .set(DriverConfiguration.ON_DRIVER_STOP, SuspendDriver.StopHandler.class)
088        .build());
089
090    this.driverConfig = cb.build();
091    this.reef = reef;
092    this.controlListener = new SuspendClientControl(port);
093  }
094
095  /**
096   * Start the job driver.
097   */
098  public void submit() {
099    LOG.info("Start the job driver");
100    this.reef.submit(this.driverConfig);
101  }
102
103  /**
104   * Wait for the job to complete.
105   */
106  public void waitForCompletion() throws Exception {
107    LOG.info("Waiting for the Job Driver to complete.");
108    try {
109      synchronized (this) {
110        this.wait();
111      }
112    } catch (final InterruptedException ex) {
113      LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
114    }
115    this.reef.close();
116    this.controlListener.close();
117  }
118
119  /**
120   * Receive notification from the driver that the job is about to run.
121   * RunningJob object is a proxy to the running job driver that can be used for sending messages.
122   */
123  final class RunningJobHandler implements EventHandler<RunningJob> {
124    @Override
125    public void onNext(final RunningJob job) {
126      LOG.log(Level.INFO, "Running job: {0}", job.getId());
127      SuspendClient.this.controlListener.setRunningJob(job);
128    }
129  }
130
131  /**
132   * Receive notification from the driver that the job had failed.
133   * <p/>
134   * FailedJob is a proxy for the failed job driver
135   * (contains job ID and exception thrown from the driver).
136   */
137  final class FailedJobHandler implements EventHandler<FailedJob> {
138    @Override
139    public void onNext(final FailedJob job) {
140      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
141      synchronized (SuspendClient.this) {
142        SuspendClient.this.notify();
143      }
144    }
145  }
146
147  /**
148   * Receive notification from the driver that the job had completed successfully.
149   */
150  final class CompletedJobHandler implements EventHandler<CompletedJob> {
151    @Override
152    public void onNext(final CompletedJob job) {
153      LOG.log(Level.INFO, "Completed job: {0}", job.getId());
154      synchronized (SuspendClient.this) {
155        SuspendClient.this.notify();
156      }
157    }
158  }
159
160  /**
161   * Receive notification that there was an exception thrown from the job driver.
162   */
163  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
164    @Override
165    public void onNext(final FailedRuntime error) {
166      LOG.log(Level.SEVERE, "ERROR: " + error, error.getReason().orElse(null));
167      synchronized (SuspendClient.class) {
168        SuspendClient.this.notify();
169      }
170    }
171  }
172}