001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.messaging.driver; 020 021import org.apache.reef.client.*; 022import org.apache.reef.tang.Configuration; 023import org.apache.reef.tang.Tang; 024import org.apache.reef.tang.annotations.Unit; 025import org.apache.reef.tang.exceptions.BindException; 026import org.apache.reef.tang.exceptions.InjectionException; 027import org.apache.reef.util.EnvironmentUtils; 028import org.apache.reef.util.Optional; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035@Unit 036public final class DriverMessaging { 037 038 private static final Logger LOG = Logger.getLogger(DriverMessaging.class.getName()); 039 040 private final REEF reef; 041 042 private String lastMessage = null; 043 private Optional<RunningJob> theJob = Optional.empty(); 044 private LauncherStatus status = LauncherStatus.INIT; 045 046 @Inject 047 private DriverMessaging(final REEF reef) { 048 this.reef = reef; 049 } 050 051 public static LauncherStatus run(final Configuration runtimeConfiguration, 052 final int launcherTimeout) throws BindException, InjectionException { 053 054 final Configuration clientConfiguration = ClientConfiguration.CONF 055 .set(ClientConfiguration.ON_JOB_RUNNING, DriverMessaging.RunningJobHandler.class) 056 .set(ClientConfiguration.ON_JOB_MESSAGE, DriverMessaging.JobMessageHandler.class) 057 .set(ClientConfiguration.ON_JOB_COMPLETED, DriverMessaging.CompletedJobHandler.class) 058 .set(ClientConfiguration.ON_JOB_FAILED, DriverMessaging.FailedJobHandler.class) 059 .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverMessaging.RuntimeErrorHandler.class) 060 .build(); 061 062 return Tang.Factory.getTang() 063 .newInjector(runtimeConfiguration, clientConfiguration) 064 .getInstance(DriverMessaging.class).run(launcherTimeout, 1000); 065 } 066 067 public synchronized void close() { 068 if (this.status.isRunning()) { 069 this.status = LauncherStatus.FORCE_CLOSED; 070 } 071 if (this.theJob.isPresent()) { 072 this.theJob.get().close(); 073 } 074 this.notify(); 075 } 076 077 private LauncherStatus run(final long jobTimeout, final long statusTimeout) { 078 079 final long startTime = System.currentTimeMillis(); 080 LOG.log(Level.INFO, "Submitting REEF Job"); 081 082 final Configuration driverConfig = DriverConfiguration.CONF 083 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass())) 084 .set(DriverConfiguration.DRIVER_IDENTIFIER, "DriverMessagingTest") 085 .set(DriverConfiguration.ON_DRIVER_STARTED, DriverMessagingDriver.StartHandler.class) 086 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverMessagingDriver.AllocatedEvaluatorHandler.class) 087 .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverMessagingDriver.ClientMessageHandler.class) 088 .build(); 089 090 091 this.reef.submit(driverConfig); 092 093 synchronized (this) { 094 while (!this.status.isDone()) { 095 LOG.log(Level.INFO, "Waiting for REEF job to finish."); 096 try { 097 this.wait(statusTimeout); 098 } catch (final InterruptedException ex) { 099 LOG.log(Level.FINER, "Waiting for REEF job interrupted.", ex); 100 } 101 if (System.currentTimeMillis() - startTime >= jobTimeout) { 102 LOG.log(Level.INFO, "Waiting for REEF job timed out after {0} sec.", 103 (System.currentTimeMillis() - startTime) / 1000); 104 break; 105 } 106 } 107 } 108 109 this.reef.close(); 110 return this.status; 111 } 112 113 final class JobMessageHandler implements EventHandler<JobMessage> { 114 @Override 115 public void onNext(final JobMessage message) { 116 final String msg = new String(message.get()); 117 synchronized (DriverMessaging.this) { 118 if (!msg.equals(DriverMessaging.this.lastMessage)) { 119 LOG.log(Level.SEVERE, "Expected {0} but got {1}", 120 new Object[]{DriverMessaging.this.lastMessage, msg}); 121 DriverMessaging.this.status = LauncherStatus.FAILED; 122 DriverMessaging.this.notify(); 123 } 124 } 125 } 126 } 127 128 final class RunningJobHandler implements EventHandler<RunningJob> { 129 @Override 130 public void onNext(final RunningJob job) { 131 LOG.log(Level.INFO, "The Job {0} is running", job.getId()); 132 synchronized (DriverMessaging.this) { 133 DriverMessaging.this.status = LauncherStatus.RUNNING; 134 DriverMessaging.this.theJob = Optional.of(job); 135 DriverMessaging.this.lastMessage = "Hello, REEF!"; 136 DriverMessaging.this.theJob.get().send(DriverMessaging.this.lastMessage.getBytes()); 137 } 138 } 139 } 140 141 final class CompletedJobHandler implements EventHandler<CompletedJob> { 142 @Override 143 public void onNext(final CompletedJob job) { 144 LOG.log(Level.INFO, "Job Completed: {0}", job); 145 synchronized (DriverMessaging.this) { 146 DriverMessaging.this.status = LauncherStatus.COMPLETED; 147 DriverMessaging.this.notify(); 148 } 149 } 150 } 151 152 final class FailedJobHandler implements EventHandler<FailedJob> { 153 @Override 154 public void onNext(final FailedJob job) { 155 LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), job.getReason().orElse(null)); 156 synchronized (DriverMessaging.this) { 157 DriverMessaging.this.status = LauncherStatus.FAILED(job.getReason()); 158 DriverMessaging.this.notify(); 159 } 160 } 161 } 162 163 final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 164 @Override 165 public void onNext(final FailedRuntime error) { 166 LOG.log(Level.SEVERE, "Received a runtime error: " + error, error.getReason().orElse(null)); 167 synchronized (DriverMessaging.this) { 168 DriverMessaging.this.status = LauncherStatus.FAILED(error.getReason()); 169 DriverMessaging.this.notify(); 170 } 171 } 172 } 173}