001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.fail.driver; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.RunningTask; 027import org.apache.reef.driver.task.TaskConfiguration; 028import org.apache.reef.driver.task.TaskMessage; 029import org.apache.reef.tang.annotations.Unit; 030import org.apache.reef.tang.exceptions.BindException; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 033import org.apache.reef.wake.time.Clock; 034import org.apache.reef.wake.time.event.Alarm; 035import org.apache.reef.wake.time.event.StartTime; 036 037import javax.inject.Inject; 038import java.util.Arrays; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042@Unit 043public final class FailDriverDelayedMsg { 044 045 private static final Logger LOG = Logger.getLogger(FailDriverDelayedMsg.class.getName()); 046 private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 047 private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO"); 048 049 private final transient EvaluatorRequestor requestor; 050 private final transient Clock clock; 051 private transient RunningTask task = null; 052 053 @Inject 054 public FailDriverDelayedMsg(final EvaluatorRequestor requestor, final Clock clock) { 055 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.<init>"); 056 this.requestor = requestor; 057 this.clock = clock; 058 } 059 060 public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 061 @Override 062 public void onNext(final AllocatedEvaluator eval) { 063 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(AllocatedEvaluator): {0}", eval); 064 try { 065 eval.submitContext(ContextConfiguration.CONF 066 .set(ContextConfiguration.IDENTIFIER, "Context_" + eval.getId()) 067 .build()); 068 } catch (final BindException ex) { 069 LOG.log(Level.WARNING, "Context configuration error", ex); 070 throw new RuntimeException(ex); 071 } 072 } 073 } 074 075 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 076 @Override 077 public void onNext(final ActiveContext context) { 078 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(ActiveContext): {0}", context); 079 try { 080 context.submitTask(TaskConfiguration.CONF 081 .set(TaskConfiguration.IDENTIFIER, "Task_" + context.getId()) 082 .set(TaskConfiguration.TASK, NoopTask.class) 083 .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class) 084 .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class) 085 .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class) 086 .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class) 087 .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class) 088 .build()); 089 } catch (final BindException ex) { 090 LOG.log(Level.WARNING, "Task configuration error", ex); 091 throw new RuntimeException(ex); 092 } 093 } 094 } 095 096 public final class RunningTaskHandler implements EventHandler<RunningTask> { 097 @Override 098 public void onNext(final RunningTask task) { 099 FailDriverDelayedMsg.this.task = task; 100 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskRuntime): {0}", task); 101 FailDriverDelayedMsg.this.clock.scheduleAlarm(2000, new EventHandler<Alarm>() { 102 @Override 103 public void onNext(final Alarm time) { 104 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(Alarm): {0}", time); 105 task.send(HELLO_STR); 106 } 107 }); 108 } 109 } 110 111 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 112 @Override 113 public void onNext(final TaskMessage msg) { 114 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskMessage): {0}", msg); 115 assert (Arrays.equals(HELLO_STR, msg.get())); 116 FailDriverDelayedMsg.this.task.close(); 117 } 118 } 119 120 public final class StartHandler implements EventHandler<StartTime> { 121 @Override 122 public void onNext(final StartTime time) { 123 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(StartTime): {0}", time); 124 FailDriverDelayedMsg.this.requestor.submit(EvaluatorRequest.newBuilder() 125 .setNumber(1).setMemory(128).setNumberOfCores(1).build()); 126 } 127 } 128}