This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.messaging.driver;
020
021import org.apache.reef.client.*;
022import org.apache.reef.tang.Configuration;
023import org.apache.reef.tang.Tang;
024import org.apache.reef.tang.annotations.Unit;
025import org.apache.reef.tang.exceptions.BindException;
026import org.apache.reef.tang.exceptions.InjectionException;
027import org.apache.reef.util.EnvironmentUtils;
028import org.apache.reef.util.Optional;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035@Unit
036public final class DriverMessaging {
037
038  private static final Logger LOG = Logger.getLogger(DriverMessaging.class.getName());
039
040  private final REEF reef;
041
042  private String lastMessage = null;
043  private Optional<RunningJob> theJob = Optional.empty();
044  private LauncherStatus status = LauncherStatus.INIT;
045
046  @Inject
047  private DriverMessaging(final REEF reef) {
048    this.reef = reef;
049  }
050
051  public static LauncherStatus run(final Configuration runtimeConfiguration,
052                                   final int launcherTimeout) throws BindException, InjectionException {
053
054    final Configuration clientConfiguration = ClientConfiguration.CONF
055        .set(ClientConfiguration.ON_JOB_RUNNING, DriverMessaging.RunningJobHandler.class)
056        .set(ClientConfiguration.ON_JOB_MESSAGE, DriverMessaging.JobMessageHandler.class)
057        .set(ClientConfiguration.ON_JOB_COMPLETED, DriverMessaging.CompletedJobHandler.class)
058        .set(ClientConfiguration.ON_JOB_FAILED, DriverMessaging.FailedJobHandler.class)
059        .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverMessaging.RuntimeErrorHandler.class)
060        .build();
061
062    return Tang.Factory.getTang()
063        .newInjector(runtimeConfiguration, clientConfiguration)
064        .getInstance(DriverMessaging.class).run(launcherTimeout, 1000);
065  }
066
067  public synchronized void close() {
068    if (this.status.isRunning()) {
069      this.status = LauncherStatus.FORCE_CLOSED;
070    }
071    if (this.theJob.isPresent()) {
072      this.theJob.get().close();
073    }
074    this.notify();
075  }
076
077  private LauncherStatus run(final long jobTimeout, final long statusTimeout) {
078
079    final long startTime = System.currentTimeMillis();
080    LOG.log(Level.INFO, "Submitting REEF Job");
081
082    final Configuration driverConfig = DriverConfiguration.CONF
083        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass()))
084        .set(DriverConfiguration.DRIVER_IDENTIFIER, "DriverMessagingTest")
085        .set(DriverConfiguration.ON_DRIVER_STARTED, DriverMessagingDriver.StartHandler.class)
086        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverMessagingDriver.AllocatedEvaluatorHandler.class)
087        .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverMessagingDriver.ClientMessageHandler.class)
088        .build();
089
090
091    this.reef.submit(driverConfig);
092
093    synchronized (this) {
094      while (!this.status.isDone()) {
095        LOG.log(Level.INFO, "Waiting for REEF job to finish.");
096        try {
097          this.wait(statusTimeout);
098        } catch (final InterruptedException ex) {
099          LOG.log(Level.FINER, "Waiting for REEF job interrupted.", ex);
100        }
101        if (System.currentTimeMillis() - startTime >= jobTimeout) {
102          LOG.log(Level.INFO, "Waiting for REEF job timed out after {0} sec.",
103              (System.currentTimeMillis() - startTime) / 1000);
104          break;
105        }
106      }
107    }
108
109    this.reef.close();
110    return this.status;
111  }
112
113  final class JobMessageHandler implements EventHandler<JobMessage> {
114    @Override
115    public void onNext(final JobMessage message) {
116      final String msg = new String(message.get());
117      synchronized (DriverMessaging.this) {
118        if (!msg.equals(DriverMessaging.this.lastMessage)) {
119          LOG.log(Level.SEVERE, "Expected {0} but got {1}",
120              new Object[]{DriverMessaging.this.lastMessage, msg});
121          DriverMessaging.this.status = LauncherStatus.FAILED;
122          DriverMessaging.this.notify();
123        }
124      }
125    }
126  }
127
128  final class RunningJobHandler implements EventHandler<RunningJob> {
129    @Override
130    public void onNext(final RunningJob job) {
131      LOG.log(Level.INFO, "The Job {0} is running", job.getId());
132      synchronized (DriverMessaging.this) {
133        DriverMessaging.this.status = LauncherStatus.RUNNING;
134        DriverMessaging.this.theJob = Optional.of(job);
135        DriverMessaging.this.lastMessage = "Hello, REEF!";
136        DriverMessaging.this.theJob.get().send(DriverMessaging.this.lastMessage.getBytes());
137      }
138    }
139  }
140
141  final class CompletedJobHandler implements EventHandler<CompletedJob> {
142    @Override
143    public void onNext(final CompletedJob job) {
144      LOG.log(Level.INFO, "Job Completed: {0}", job);
145      synchronized (DriverMessaging.this) {
146        DriverMessaging.this.status = LauncherStatus.COMPLETED;
147        DriverMessaging.this.notify();
148      }
149    }
150  }
151
152  final class FailedJobHandler implements EventHandler<FailedJob> {
153    @Override
154    public void onNext(final FailedJob job) {
155      LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), job.getReason().orElse(null));
156      synchronized (DriverMessaging.this) {
157        DriverMessaging.this.status = LauncherStatus.FAILED(job.getReason());
158        DriverMessaging.this.notify();
159      }
160    }
161  }
162
163  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
164    @Override
165    public void onNext(final FailedRuntime error) {
166      LOG.log(Level.SEVERE, "Received a runtime error: " + error, error.getReason().orElse(null));
167      synchronized (DriverMessaging.this) {
168        DriverMessaging.this.status = LauncherStatus.FAILED(error.getReason());
169        DriverMessaging.this.notify();
170      }
171    }
172  }
173}