001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.messaging.driver; 020 021import org.apache.reef.client.*; 022import org.apache.reef.tang.Configuration; 023import org.apache.reef.tang.Tang; 024import org.apache.reef.tang.annotations.Unit; 025import org.apache.reef.tang.exceptions.BindException; 026import org.apache.reef.tang.exceptions.InjectionException; 027import org.apache.reef.util.EnvironmentUtils; 028import org.apache.reef.util.Optional; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.nio.charset.StandardCharsets; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * Runner for DriverMessagingTest. 038 */ 039@Unit 040public final class DriverMessaging { 041 042 private static final Logger LOG = Logger.getLogger(DriverMessaging.class.getName()); 043 044 private final REEF reef; 045 046 private String lastMessage = null; 047 private Optional<RunningJob> theJob = Optional.empty(); 048 private LauncherStatus status = LauncherStatus.INIT; 049 050 @Inject 051 private DriverMessaging(final REEF reef) { 052 this.reef = reef; 053 } 054 055 public static LauncherStatus run(final Configuration runtimeConfiguration, 056 final int launcherTimeout) throws BindException, InjectionException { 057 058 final Configuration clientConfiguration = ClientConfiguration.CONF 059 .set(ClientConfiguration.ON_JOB_RUNNING, DriverMessaging.RunningJobHandler.class) 060 .set(ClientConfiguration.ON_JOB_MESSAGE, DriverMessaging.JobMessageHandler.class) 061 .set(ClientConfiguration.ON_JOB_COMPLETED, DriverMessaging.CompletedJobHandler.class) 062 .set(ClientConfiguration.ON_JOB_FAILED, DriverMessaging.FailedJobHandler.class) 063 .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverMessaging.RuntimeErrorHandler.class) 064 .build(); 065 066 return Tang.Factory.getTang() 067 .newInjector(runtimeConfiguration, clientConfiguration) 068 .getInstance(DriverMessaging.class).run(launcherTimeout, 1000); 069 } 070 071 public synchronized void close() { 072 if (this.status.isRunning()) { 073 this.status = LauncherStatus.FORCE_CLOSED; 074 } 075 if (this.theJob.isPresent()) { 076 this.theJob.get().close(); 077 } 078 this.notify(); 079 } 080 081 private LauncherStatus run(final long jobTimeout, final long statusTimeout) { 082 083 final long startTime = System.currentTimeMillis(); 084 LOG.log(Level.INFO, "Submitting REEF Job"); 085 086 final Configuration driverConfig = DriverConfiguration.CONF 087 .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass())) 088 .set(DriverConfiguration.DRIVER_IDENTIFIER, "DriverMessagingTest") 089 .set(DriverConfiguration.ON_DRIVER_STARTED, DriverMessagingDriver.StartHandler.class) 090 .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverMessagingDriver.AllocatedEvaluatorHandler.class) 091 .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverMessagingDriver.ClientMessageHandler.class) 092 .build(); 093 094 095 this.reef.submit(driverConfig); 096 097 synchronized (this) { 098 while (!this.status.isDone()) { 099 LOG.log(Level.INFO, "Waiting for REEF job to finish."); 100 try { 101 this.wait(statusTimeout); 102 } catch (final InterruptedException ex) { 103 LOG.log(Level.FINER, "Waiting for REEF job interrupted.", ex); 104 } 105 if (System.currentTimeMillis() - startTime >= jobTimeout) { 106 LOG.log(Level.INFO, "Waiting for REEF job timed out after {0} sec.", 107 (System.currentTimeMillis() - startTime) / 1000); 108 break; 109 } 110 } 111 } 112 113 this.reef.close(); 114 synchronized (this) { 115 return this.status; 116 } 117 } 118 119 final class JobMessageHandler implements EventHandler<JobMessage> { 120 @Override 121 public void onNext(final JobMessage message) { 122 final String msg = new String(message.get(), StandardCharsets.UTF_8); 123 synchronized (DriverMessaging.this) { 124 if (!msg.equals(DriverMessaging.this.lastMessage)) { 125 LOG.log(Level.SEVERE, "Expected {0} but got {1}", 126 new Object[]{DriverMessaging.this.lastMessage, msg}); 127 DriverMessaging.this.status = LauncherStatus.FAILED; 128 DriverMessaging.this.notify(); 129 } 130 } 131 } 132 } 133 134 final class RunningJobHandler implements EventHandler<RunningJob> { 135 @Override 136 public void onNext(final RunningJob job) { 137 LOG.log(Level.INFO, "The Job {0} is running", job.getId()); 138 synchronized (DriverMessaging.this) { 139 DriverMessaging.this.status = LauncherStatus.RUNNING; 140 DriverMessaging.this.theJob = Optional.of(job); 141 DriverMessaging.this.lastMessage = "Hello, REEF!"; 142 DriverMessaging.this.theJob.get().send(DriverMessaging.this.lastMessage.getBytes(StandardCharsets.UTF_8)); 143 } 144 } 145 } 146 147 final class CompletedJobHandler implements EventHandler<CompletedJob> { 148 @Override 149 public void onNext(final CompletedJob job) { 150 LOG.log(Level.INFO, "Job Completed: {0}", job); 151 synchronized (DriverMessaging.this) { 152 DriverMessaging.this.status = LauncherStatus.COMPLETED; 153 DriverMessaging.this.notify(); 154 } 155 } 156 } 157 158 final class FailedJobHandler implements EventHandler<FailedJob> { 159 @Override 160 public void onNext(final FailedJob job) { 161 LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), job.getReason().orElse(null)); 162 synchronized (DriverMessaging.this) { 163 DriverMessaging.this.status = LauncherStatus.failed(job.getReason()); 164 DriverMessaging.this.notify(); 165 } 166 } 167 } 168 169 final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { 170 @Override 171 public void onNext(final FailedRuntime error) { 172 LOG.log(Level.SEVERE, "Received a runtime error: " + error, error.getReason().orElse(null)); 173 synchronized (DriverMessaging.this) { 174 DriverMessaging.this.status = LauncherStatus.failed(error.getReason()); 175 DriverMessaging.this.notify(); 176 } 177 } 178 } 179}