This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge.generic;
020
021import org.apache.reef.client.*;
022import org.apache.reef.io.network.naming.NameServerConfiguration;
023import org.apache.reef.javabridge.NativeInterop;
024import org.apache.reef.runtime.yarn.driver.YarnDriverRestartConfiguration;
025import org.apache.reef.tang.Configuration;
026import org.apache.reef.tang.Configurations;
027import org.apache.reef.tang.annotations.Unit;
028import org.apache.reef.tang.exceptions.BindException;
029import org.apache.reef.tang.formats.AvroConfigurationSerializer;
030import org.apache.reef.tang.formats.ConfigurationModule;
031import org.apache.reef.util.EnvironmentUtils;
032import org.apache.reef.util.logging.LoggingScope;
033import org.apache.reef.util.logging.LoggingScopeFactory;
034import org.apache.reef.wake.EventHandler;
035import org.apache.reef.webserver.HttpHandlerConfiguration;
036import org.apache.reef.webserver.HttpServerReefEventHandler;
037import org.apache.reef.webserver.ReefEventStateManager;
038
039import javax.inject.Inject;
040import java.io.File;
041import java.io.IOException;
042import java.nio.charset.StandardCharsets;
043import java.nio.file.Files;
044import java.nio.file.Path;
045import java.nio.file.Paths;
046import java.util.logging.Level;
047import java.util.logging.Logger;
048
049/**
050 * Clr Bridge Client.
051 */
052@Unit
053public class JobClient {
054
055  /**
056   * Standard java logger.
057   */
058  private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
059
060  /**
061   * Reference to the REEF framework.
062   * This variable is injected automatically in the constructor.
063   */
064  private final REEF reef;
065
066  /**
067   * Job Driver configuration.
068   */
069  private Configuration driverConfiguration;
070  private ConfigurationModule driverConfigModule;
071
072  /**
073   * A reference to the running job that allows client to send messages back to the job driver.
074   */
075  private RunningJob runningJob;
076
077  /**
078   * Set to false when job driver is done.
079   */
080  private boolean isBusy = true;
081
082  private int driverMemory;
083
084  private String driverId;
085
086  private String jobSubmissionDirectory = "reefTmp/job_" + System.currentTimeMillis();
087
088  /**
089   * A factory that provides LoggingScope.
090   */
091  private final LoggingScopeFactory loggingScopeFactory;
092  /**
093   * Clr Bridge client.
094   * Parameters are injected automatically by TANG.
095   *
096   * @param reef Reference to the REEF framework.
097   */
098  @Inject
099  JobClient(final REEF reef, final LoggingScopeFactory loggingScopeFactory) throws BindException {
100    this.loggingScopeFactory = loggingScopeFactory;
101    this.reef = reef;
102    this.driverConfigModule = getDriverConfiguration();
103  }
104
105  public static ConfigurationModule getDriverConfiguration() {
106    return DriverConfiguration.CONF
107        .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars())
108        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
109        .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
110        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
111        .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
112        .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
113        .set(DriverConfiguration.ON_CONTEXT_MESSAGE, JobDriver.ContextMessageHandler.class)
114        .set(DriverConfiguration.ON_TASK_MESSAGE, JobDriver.TaskMessageHandler.class)
115        .set(DriverConfiguration.ON_TASK_FAILED, JobDriver.FailedTaskHandler.class)
116        .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
117        .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
118        .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
119        .set(DriverConfiguration.ON_TASK_SUSPENDED, JobDriver.SuspendedTaskHandler.class)
120        .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class);
121  }
122
123  private static Configuration getNameServerConfiguration() {
124    return NameServerConfiguration.CONF
125        .set(NameServerConfiguration.NAME_SERVICE_PORT, 0)
126        .build();
127  }
128
129  /**
130   * @return the driver-side configuration to be merged into the DriverConfiguration to enable the HTTP server.
131   */
132  public static Configuration getHTTPConfiguration() {
133    final Configuration httpHandlerConfiguration = HttpHandlerConfiguration.CONF
134        .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerReefEventHandler.class)
135        .build();
136
137    final Configuration driverConfigurationForHttpServer = DriverServiceConfiguration.CONF
138        .set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED,
139            ReefEventStateManager.AllocatedEvaluatorStateHandler.class)
140        .set(DriverServiceConfiguration.ON_CONTEXT_ACTIVE, ReefEventStateManager.ActiveContextStateHandler.class)
141        .set(DriverServiceConfiguration.ON_TASK_RUNNING, ReefEventStateManager.TaskRunningStateHandler.class)
142        .set(DriverServiceConfiguration.ON_DRIVER_STARTED, ReefEventStateManager.StartStateHandler.class)
143        .set(DriverServiceConfiguration.ON_DRIVER_STOP, ReefEventStateManager.StopStateHandler.class)
144        .build();
145
146    return Configurations.merge(httpHandlerConfiguration, driverConfigurationForHttpServer);
147  }
148
149  public static Configuration getYarnConfiguration() {
150    final Configuration yarnDriverRestartConfiguration = YarnDriverRestartConfiguration.CONF
151        .build();
152
153    final Configuration driverRestartHandlerConfigurations = DriverRestartConfiguration.CONF
154        .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED,
155            ReefEventStateManager.DriverRestartHandler.class)
156        .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
157            ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
158        .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE,
159            ReefEventStateManager.DriverRestartActiveContextStateHandler.class)
160        .build();
161
162    return Configurations.merge(yarnDriverRestartConfiguration, driverRestartHandlerConfigurations);
163  }
164
165  public void addCLRFiles(final File folder) throws BindException {
166    try (final LoggingScope ls = this.loggingScopeFactory.getNewLoggingScope("JobClient::addCLRFiles")) {
167      ConfigurationModule result = this.driverConfigModule;
168      final File[] files = folder.listFiles();
169      if (files != null) {
170        for (final File f : files) {
171          if (f.canRead() && f.exists() && f.isFile()) {
172            result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
173          }
174        }
175      }
176
177      // set the driver memory, id and job submission directory
178      this.driverConfigModule = result
179          .set(DriverConfiguration.DRIVER_MEMORY, this.driverMemory)
180          .set(DriverConfiguration.DRIVER_IDENTIFIER, this.driverId)
181          .set(DriverConfiguration.DRIVER_JOB_SUBMISSION_DIRECTORY, this.jobSubmissionDirectory);
182
183
184      final Path globalLibFile = Paths.get(NativeInterop.GLOBAL_LIBRARIES_FILENAME);
185      if (!Files.exists(globalLibFile)) {
186        LOG.log(Level.FINE, "Cannot find global classpath file at: {0}, assume there is none.",
187            globalLibFile.toAbsolutePath());
188      } else {
189        String globalLibString = "";
190        try {
191          globalLibString = new String(Files.readAllBytes(globalLibFile), StandardCharsets.UTF_8);
192        } catch (final Exception e) {
193          LOG.log(Level.WARNING, "Cannot read from {0}, global libraries not added  " + globalLibFile.toAbsolutePath());
194        }
195
196        for (final String s : globalLibString.split(",")) {
197          final File f = new File(s);
198          this.driverConfigModule = this.driverConfigModule.set(DriverConfiguration.GLOBAL_LIBRARIES, f.getPath());
199        }
200      }
201
202      this.driverConfiguration = Configurations.merge(this.driverConfigModule.build(), getHTTPConfiguration(),
203          getNameServerConfiguration());
204    }
205  }
206
207  /**
208   * Launch the job driver.
209   *
210   * @throws org.apache.reef.tang.exceptions.BindException configuration error.
211   */
212  public void submit(final File clrFolder, final boolean submitDriver,
213                     final boolean local, final Configuration clientConfig) {
214    try (final LoggingScope ls = this.loggingScopeFactory.driverSubmit(submitDriver)) {
215      if (!local) {
216        this.driverConfiguration = Configurations.merge(this.driverConfiguration, getYarnConfiguration());
217      }
218
219      try {
220        addCLRFiles(clrFolder);
221      } catch (final BindException e) {
222        LOG.log(Level.FINE, "Failed to bind", e);
223      }
224      if (submitDriver) {
225        this.reef.submit(this.driverConfiguration);
226      } else {
227        final File driverConfig = new File(System.getProperty("user.dir") + "/driver.config");
228        try {
229          new AvroConfigurationSerializer().toFile(Configurations.merge(this.driverConfiguration, clientConfig),
230              driverConfig);
231          LOG.log(Level.INFO, "Driver configuration file created at " + driverConfig.getAbsolutePath());
232        } catch (final IOException e) {
233          throw new RuntimeException("Cannot create driver configuration file at " + driverConfig.getAbsolutePath(), e);
234        }
235      }
236    }
237  }
238
239  /**
240   * Set the driver memory.
241   */
242  @SuppressWarnings("checkstyle:hiddenfield")
243  public void setDriverInfo(final String identifier, final int memory, final String jobSubmissionDirectory) {
244    if (identifier == null || identifier.isEmpty()) {
245      throw new RuntimeException("driver id cannot be null or empty");
246    }
247    if (memory <= 0) {
248      throw new RuntimeException("driver memory cannot be negative number: " + memory);
249    }
250    this.driverMemory = memory;
251    this.driverId = identifier;
252    if (jobSubmissionDirectory != null && !jobSubmissionDirectory.equals("empty")) {
253      this.jobSubmissionDirectory = jobSubmissionDirectory;
254    } else {
255      LOG.log(Level.FINE, "No job submission directory provided by CLR user, will use " + this.jobSubmissionDirectory);
256    }
257  }
258
259  /**
260   * Notify the process in waitForCompletion() method that the main process has finished.
261   */
262  private synchronized void stopAndNotify() {
263    this.runningJob = null;
264    this.isBusy = false;
265    this.notify();
266  }
267
268  /**
269   * Wait for the job driver to complete.
270   */
271  public void waitForCompletion(final int waitTime) {
272    LOG.info("Waiting for the Job Driver to complete: " + waitTime);
273    if (waitTime == 0) {
274      close(0);
275      return;
276    } else if (waitTime < 0) {
277      waitTillDone();
278    }
279    final long endTime = System.currentTimeMillis() + waitTime * 1000;
280    close(endTime);
281  }
282
283  public void close(final long endTime) {
284    while (endTime > System.currentTimeMillis()) {
285      try {
286        Thread.sleep(1000);
287      } catch (final InterruptedException e) {
288        LOG.log(Level.SEVERE, "Thread sleep failed");
289      }
290    }
291    LOG.log(Level.INFO, "Done waiting.");
292    this.stopAndNotify();
293    reef.close();
294  }
295
296  private void waitTillDone() {
297    while (this.isBusy) {
298      try {
299        synchronized (this) {
300          this.wait();
301        }
302      } catch (final InterruptedException ex) {
303        LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
304      }
305    }
306    this.reef.close();
307  }
308
309  /**
310   * Receive notification from the job driver that the job had failed.
311   */
312  final class FailedJobHandler implements EventHandler<FailedJob> {
313    @Override
314    public void onNext(final FailedJob job) {
315      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getMessage());
316      stopAndNotify();
317    }
318  }
319
320  /**
321   * Receive notification from the job driver that the job had completed successfully.
322   */
323  final class CompletedJobHandler implements EventHandler<CompletedJob> {
324    @Override
325    public void onNext(final CompletedJob job) {
326      LOG.log(Level.INFO, "Completed job: {0}", job.getId());
327      stopAndNotify();
328    }
329  }
330
331  /**
332   * Receive notification that there was an exception thrown from the job driver.
333   */
334  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
335    @Override
336    public void onNext(final FailedRuntime error) {
337      LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getMessage());
338      stopAndNotify();
339    }
340  }
341
342  final class WakeErrorHandler implements EventHandler<Throwable> {
343    @Override
344    public void onNext(final Throwable error) {
345      LOG.log(Level.SEVERE, "Error communicating with job driver, exiting... ", error);
346      stopAndNotify();
347    }
348  }
349}