001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.messaging.driver;
020
021import org.apache.reef.client.*;
022import org.apache.reef.tang.Configuration;
023import org.apache.reef.tang.Tang;
024import org.apache.reef.tang.annotations.Unit;
025import org.apache.reef.tang.exceptions.BindException;
026import org.apache.reef.tang.exceptions.InjectionException;
027import org.apache.reef.util.EnvironmentUtils;
028import org.apache.reef.util.Optional;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.nio.charset.StandardCharsets;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036/**
037 * Runner for DriverMessagingTest.
038 */
039@Unit
040public final class DriverMessaging {
041
042  private static final Logger LOG = Logger.getLogger(DriverMessaging.class.getName());
043
044  private final REEF reef;
045
046  private String lastMessage = null;
047  private Optional<RunningJob> theJob = Optional.empty();
048  private LauncherStatus status = LauncherStatus.INIT;
049
050  @Inject
051  private DriverMessaging(final REEF reef) {
052    this.reef = reef;
053  }
054
055  public static LauncherStatus run(final Configuration runtimeConfiguration,
056                                   final int launcherTimeout) throws BindException, InjectionException {
057
058    final Configuration clientConfiguration = ClientConfiguration.CONF
059        .set(ClientConfiguration.ON_JOB_RUNNING, DriverMessaging.RunningJobHandler.class)
060        .set(ClientConfiguration.ON_JOB_MESSAGE, DriverMessaging.JobMessageHandler.class)
061        .set(ClientConfiguration.ON_JOB_COMPLETED, DriverMessaging.CompletedJobHandler.class)
062        .set(ClientConfiguration.ON_JOB_FAILED, DriverMessaging.FailedJobHandler.class)
063        .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverMessaging.RuntimeErrorHandler.class)
064        .build();
065
066    return Tang.Factory.getTang()
067        .newInjector(runtimeConfiguration, clientConfiguration)
068        .getInstance(DriverMessaging.class).run(launcherTimeout, 1000);
069  }
070
071  public synchronized void close() {
072    if (this.status.isRunning()) {
073      this.status = LauncherStatus.FORCE_CLOSED;
074    }
075    if (this.theJob.isPresent()) {
076      this.theJob.get().close();
077    }
078    this.notify();
079  }
080
081  private LauncherStatus run(final long jobTimeout, final long statusTimeout) {
082
083    final long startTime = System.currentTimeMillis();
084    LOG.log(Level.INFO, "Submitting REEF Job");
085
086    final Configuration driverConfig = DriverConfiguration.CONF
087        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass()))
088        .set(DriverConfiguration.DRIVER_IDENTIFIER, "DriverMessagingTest")
089        .set(DriverConfiguration.ON_DRIVER_STARTED, DriverMessagingDriver.StartHandler.class)
090        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverMessagingDriver.AllocatedEvaluatorHandler.class)
091        .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverMessagingDriver.ClientMessageHandler.class)
092        .build();
093
094
095    this.reef.submit(driverConfig);
096
097    synchronized (this) {
098      while (!this.status.isDone()) {
099        LOG.log(Level.INFO, "Waiting for REEF job to finish.");
100        try {
101          this.wait(statusTimeout);
102        } catch (final InterruptedException ex) {
103          LOG.log(Level.FINER, "Waiting for REEF job interrupted.", ex);
104        }
105        if (System.currentTimeMillis() - startTime >= jobTimeout) {
106          LOG.log(Level.INFO, "Waiting for REEF job timed out after {0} sec.",
107              (System.currentTimeMillis() - startTime) / 1000);
108          break;
109        }
110      }
111    }
112
113    this.reef.close();
114    synchronized (this) {
115      return this.status;
116    }
117  }
118
119  final class JobMessageHandler implements EventHandler<JobMessage> {
120    @Override
121    public void onNext(final JobMessage message) {
122      final String msg = new String(message.get(), StandardCharsets.UTF_8);
123      synchronized (DriverMessaging.this) {
124        if (!msg.equals(DriverMessaging.this.lastMessage)) {
125          LOG.log(Level.SEVERE, "Expected {0} but got {1}",
126              new Object[]{DriverMessaging.this.lastMessage, msg});
127          DriverMessaging.this.status = LauncherStatus.FAILED;
128          DriverMessaging.this.notify();
129        }
130      }
131    }
132  }
133
134  final class RunningJobHandler implements EventHandler<RunningJob> {
135    @Override
136    public void onNext(final RunningJob job) {
137      LOG.log(Level.INFO, "The Job {0} is running", job.getId());
138      synchronized (DriverMessaging.this) {
139        DriverMessaging.this.status = LauncherStatus.RUNNING;
140        DriverMessaging.this.theJob = Optional.of(job);
141        DriverMessaging.this.lastMessage = "Hello, REEF!";
142        DriverMessaging.this.theJob.get().send(DriverMessaging.this.lastMessage.getBytes(StandardCharsets.UTF_8));
143      }
144    }
145  }
146
147  final class CompletedJobHandler implements EventHandler<CompletedJob> {
148    @Override
149    public void onNext(final CompletedJob job) {
150      LOG.log(Level.INFO, "Job Completed: {0}", job);
151      synchronized (DriverMessaging.this) {
152        DriverMessaging.this.status = LauncherStatus.COMPLETED;
153        DriverMessaging.this.notify();
154      }
155    }
156  }
157
158  final class FailedJobHandler implements EventHandler<FailedJob> {
159    @Override
160    public void onNext(final FailedJob job) {
161      LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), job.getReason().orElse(null));
162      synchronized (DriverMessaging.this) {
163        DriverMessaging.this.status = LauncherStatus.failed(job.getReason());
164        DriverMessaging.this.notify();
165      }
166    }
167  }
168
169  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
170    @Override
171    public void onNext(final FailedRuntime error) {
172      LOG.log(Level.SEVERE, "Received a runtime error: " + error, error.getReason().orElse(null));
173      synchronized (DriverMessaging.this) {
174        DriverMessaging.this.status = LauncherStatus.failed(error.getReason());
175        DriverMessaging.this.notify();
176      }
177    }
178  }
179}