001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.fail.driver; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.RunningTask; 027import org.apache.reef.driver.task.TaskConfiguration; 028import org.apache.reef.driver.task.TaskMessage; 029import org.apache.reef.tang.annotations.Unit; 030import org.apache.reef.tang.exceptions.BindException; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 033import org.apache.reef.wake.time.Clock; 034import org.apache.reef.wake.time.event.Alarm; 035import org.apache.reef.wake.time.event.StartTime; 036 037import javax.inject.Inject; 038import java.util.Arrays; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041 042/** 043 * Driver which fails due to delayed message. 044 */ 045@Unit 046public final class FailDriverDelayedMsg { 047 048 private static final Logger LOG = Logger.getLogger(FailDriverDelayedMsg.class.getName()); 049 private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); 050 private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO"); 051 052 private final transient EvaluatorRequestor requestor; 053 private final transient Clock clock; 054 private transient RunningTask task = null; 055 056 @Inject 057 public FailDriverDelayedMsg(final EvaluatorRequestor requestor, final Clock clock) { 058 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.<init>"); 059 this.requestor = requestor; 060 this.clock = clock; 061 } 062 063 /** 064 * Handler for AllocatedEvaluator. 065 */ 066 public final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 067 @Override 068 public void onNext(final AllocatedEvaluator eval) { 069 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(AllocatedEvaluator): {0}", eval); 070 try { 071 eval.submitContext(ContextConfiguration.CONF 072 .set(ContextConfiguration.IDENTIFIER, "Context_" + eval.getId()) 073 .build()); 074 } catch (final BindException ex) { 075 LOG.log(Level.WARNING, "Context configuration error", ex); 076 throw new RuntimeException(ex); 077 } 078 } 079 } 080 081 /** 082 * Handler for ActiveContext. 083 */ 084 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 085 @Override 086 public void onNext(final ActiveContext context) { 087 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(ActiveContext): {0}", context); 088 try { 089 context.submitTask(TaskConfiguration.CONF 090 .set(TaskConfiguration.IDENTIFIER, "Task_" + context.getId()) 091 .set(TaskConfiguration.TASK, NoopTask.class) 092 .set(TaskConfiguration.ON_MESSAGE, NoopTask.DriverMessageHandler.class) 093 .set(TaskConfiguration.ON_SUSPEND, NoopTask.TaskSuspendHandler.class) 094 .set(TaskConfiguration.ON_TASK_STOP, NoopTask.TaskStopHandler.class) 095 .set(TaskConfiguration.ON_CLOSE, NoopTask.TaskCloseHandler.class) 096 .set(TaskConfiguration.ON_SEND_MESSAGE, NoopTask.class) 097 .build()); 098 } catch (final BindException ex) { 099 LOG.log(Level.WARNING, "Task configuration error", ex); 100 throw new RuntimeException(ex); 101 } 102 } 103 } 104 105 /** 106 * Handler for RunningTask. 107 */ 108 public final class RunningTaskHandler implements EventHandler<RunningTask> { 109 @Override 110 @SuppressWarnings("checkstyle:hiddenfield") 111 public void onNext(final RunningTask task) { 112 FailDriverDelayedMsg.this.task = task; 113 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskRuntime): {0}", task); 114 FailDriverDelayedMsg.this.clock.scheduleAlarm(2000, new EventHandler<Alarm>() { 115 @Override 116 public void onNext(final Alarm time) { 117 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(Alarm): {0}", time); 118 task.send(HELLO_STR); 119 } 120 }); 121 } 122 } 123 124 /** 125 * Handler for TaskMessage. 126 */ 127 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 128 @Override 129 public void onNext(final TaskMessage msg) { 130 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(TaskMessage): {0}", msg); 131 assert Arrays.equals(HELLO_STR, msg.get()); 132 FailDriverDelayedMsg.this.task.close(); 133 } 134 } 135 136 /** 137 * Handler for StartTime. 138 */ 139 public final class StartHandler implements EventHandler<StartTime> { 140 @Override 141 public void onNext(final StartTime time) { 142 LOG.log(Level.INFO, "ENTER: FailDriverDelayedMsg.send(StartTime): {0}", time); 143 FailDriverDelayedMsg.this.requestor.submit(EvaluatorRequest.newBuilder() 144 .setNumber(1).setMemory(128).setNumberOfCores(1).build()); 145 } 146 } 147}