001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.fail.task; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.RunningTask; 027import org.apache.reef.driver.task.TaskConfiguration; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.annotations.Name; 030import org.apache.reef.tang.annotations.NamedParameter; 031import org.apache.reef.tang.annotations.Parameter; 032import org.apache.reef.tang.annotations.Unit; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.tang.formats.ConfigurationModule; 035import org.apache.reef.tests.library.exceptions.DriverSideFailure; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.time.event.StartTime; 038 039import javax.inject.Inject; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043@Unit 044public final class Driver { 045 046 private static final Logger LOG = Logger.getLogger(Driver.class.getName()); 047 private final transient String failTaskName; 048 private final transient EvaluatorRequestor requestor; 049 private transient String taskId; 050 051 @Inject 052 public Driver(final @Parameter(FailTaskName.class) String failTaskName, 053 final EvaluatorRequestor requestor) { 054 this.failTaskName = failTaskName; 055 this.requestor = requestor; 056 } 057 058 /** 059 * Name of the message class to specify the failing message handler. 060 */ 061 @NamedParameter(doc = "Full name of the (failing) task class", short_name = "task") 062 public static final class FailTaskName implements Name<String> { 063 } 064 065 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 066 @Override 067 public void onNext(final AllocatedEvaluator eval) { 068 069 try { 070 071 taskId = failTaskName + "_" + eval.getId(); 072 LOG.log(Level.INFO, "Submit task: {0}", taskId); 073 074 final Configuration contextConfig = 075 ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, taskId).build(); 076 077 ConfigurationModule taskConfig = 078 TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, taskId); 079 080 switch (failTaskName) { 081 case "FailTask": 082 taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTask.class); 083 break; 084 case "FailTaskCall": 085 taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTaskCall.class); 086 break; 087 case "FailTaskMsg": 088 taskConfig = taskConfig 089 .set(TaskConfiguration.TASK, FailTaskMsg.class) 090 .set(TaskConfiguration.ON_MESSAGE, FailTaskMsg.class); 091 break; 092 case "FailTaskSuspend": 093 taskConfig = taskConfig 094 .set(TaskConfiguration.TASK, FailTaskSuspend.class) 095 .set(TaskConfiguration.ON_SUSPEND, FailTaskSuspend.class); 096 break; 097 case "FailTaskStart": 098 taskConfig = taskConfig 099 .set(TaskConfiguration.TASK, FailTaskStart.class) 100 .set(TaskConfiguration.ON_TASK_STARTED, FailTaskStart.class); 101 break; 102 case "FailTaskStop": 103 taskConfig = taskConfig 104 .set(TaskConfiguration.TASK, FailTaskStop.class) 105 .set(TaskConfiguration.ON_TASK_STOP, FailTaskStop.class) 106 .set(TaskConfiguration.ON_CLOSE, FailTaskStop.CloseEventHandler.class); 107 break; 108 case "FailTaskClose": 109 taskConfig = taskConfig 110 .set(TaskConfiguration.TASK, FailTaskClose.class) 111 .set(TaskConfiguration.ON_CLOSE, FailTaskClose.class); 112 break; 113 } 114 115 eval.submitContextAndTask(contextConfig, taskConfig.build()); 116 117 } catch (final BindException ex) { 118 LOG.log(Level.WARNING, "Configuration error", ex); 119 throw new DriverSideFailure("Configuration error", ex); 120 } 121 } 122 } 123 124 final class RunningTaskHandler implements EventHandler<RunningTask> { 125 @Override 126 public void onNext(final RunningTask task) { 127 128 LOG.log(Level.INFO, "TaskRuntime: {0} expect {1}", 129 new Object[]{task.getId(), taskId}); 130 131 if (!taskId.equals(task.getId())) { 132 throw new DriverSideFailure("Task ID " + task.getId() 133 + " not equal expected ID " + taskId); 134 } 135 136 switch (failTaskName) { 137 case "FailTaskMsg": 138 LOG.log(Level.INFO, "TaskRuntime: Send message: {0}", task); 139 task.send(new byte[0]); 140 break; 141 case "FailTaskSuspend": 142 LOG.log(Level.INFO, "TaskRuntime: Suspend: {0}", task); 143 task.suspend(); 144 break; 145 case "FailTaskStop": 146 case "FailTaskClose": 147 LOG.log(Level.INFO, "TaskRuntime: Stop/Close: {0}", task); 148 task.close(); 149 break; 150 } 151 } 152 } 153 154 final class ActiveContextHandler implements EventHandler<ActiveContext> { 155 @Override 156 public void onNext(final ActiveContext context) throws DriverSideFailure { 157 throw new DriverSideFailure("Unexpected ActiveContext message: " + context.getId()); 158 } 159 } 160 161 final class StartHandler implements EventHandler<StartTime> { 162 @Override 163 public void onNext(final StartTime time) { 164 LOG.log(Level.INFO, "StartTime: {0}", time); 165 Driver.this.requestor.submit(EvaluatorRequest.newBuilder() 166 .setNumber(1).setMemory(128).setNumberOfCores(1).build()); 167 } 168 } 169}