001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.fail.task; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.RunningTask; 027import org.apache.reef.driver.task.TaskConfiguration; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.annotations.Name; 030import org.apache.reef.tang.annotations.NamedParameter; 031import org.apache.reef.tang.annotations.Parameter; 032import org.apache.reef.tang.annotations.Unit; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.tang.formats.ConfigurationModule; 035import org.apache.reef.tests.library.exceptions.DriverSideFailure; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.time.event.StartTime; 038 039import javax.inject.Inject; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043/** 044 * Universal driver for the test REEF job that fails on different stages of execution. 045 */ 046@Unit 047public final class Driver { 048 049 private static final Logger LOG = Logger.getLogger(Driver.class.getName()); 050 private final transient String failTaskName; 051 private final transient EvaluatorRequestor requestor; 052 private transient String taskId; 053 054 @Inject 055 public Driver(@Parameter(FailTaskName.class) final String failTaskName, 056 final EvaluatorRequestor requestor) { 057 this.failTaskName = failTaskName; 058 this.requestor = requestor; 059 } 060 061 /** 062 * Name of the message class to specify the failing message handler. 063 */ 064 @NamedParameter(doc = "Full name of the (failing) task class", short_name = "task") 065 public static final class FailTaskName implements Name<String> { 066 } 067 068 final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { 069 @Override 070 public void onNext(final AllocatedEvaluator eval) { 071 072 try { 073 074 taskId = failTaskName + "_" + eval.getId(); 075 LOG.log(Level.INFO, "Submit task: {0}", taskId); 076 077 final Configuration contextConfig = 078 ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, taskId).build(); 079 080 ConfigurationModule taskConfig = 081 TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, taskId); 082 083 switch (failTaskName) { 084 case "FailTask": 085 taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTask.class); 086 break; 087 case "FailTaskCall": 088 taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTaskCall.class); 089 break; 090 case "FailTaskMsg": 091 taskConfig = taskConfig 092 .set(TaskConfiguration.TASK, FailTaskMsg.class) 093 .set(TaskConfiguration.ON_MESSAGE, FailTaskMsg.class); 094 break; 095 case "FailTaskSuspend": 096 taskConfig = taskConfig 097 .set(TaskConfiguration.TASK, FailTaskSuspend.class) 098 .set(TaskConfiguration.ON_SUSPEND, FailTaskSuspend.class); 099 break; 100 case "FailTaskStart": 101 taskConfig = taskConfig 102 .set(TaskConfiguration.TASK, FailTaskStart.class) 103 .set(TaskConfiguration.ON_TASK_STARTED, FailTaskStart.class); 104 break; 105 case "FailTaskStop": 106 taskConfig = taskConfig 107 .set(TaskConfiguration.TASK, FailTaskStop.class) 108 .set(TaskConfiguration.ON_TASK_STOP, FailTaskStop.class) 109 .set(TaskConfiguration.ON_CLOSE, FailTaskStop.CloseEventHandler.class); 110 break; 111 case "FailTaskClose": 112 taskConfig = taskConfig 113 .set(TaskConfiguration.TASK, FailTaskClose.class) 114 .set(TaskConfiguration.ON_CLOSE, FailTaskClose.class); 115 break; 116 default: 117 break; 118 } 119 120 eval.submitContextAndTask(contextConfig, taskConfig.build()); 121 122 } catch (final BindException ex) { 123 LOG.log(Level.WARNING, "Configuration error", ex); 124 throw new DriverSideFailure("Configuration error", ex); 125 } 126 } 127 } 128 129 final class RunningTaskHandler implements EventHandler<RunningTask> { 130 @Override 131 public void onNext(final RunningTask task) { 132 133 LOG.log(Level.INFO, "TaskRuntime: {0} expect {1}", 134 new Object[]{task.getId(), taskId}); 135 136 if (!taskId.equals(task.getId())) { 137 throw new DriverSideFailure("Task ID " + task.getId() 138 + " not equal expected ID " + taskId); 139 } 140 141 switch (failTaskName) { 142 case "FailTaskMsg": 143 LOG.log(Level.INFO, "TaskRuntime: Send message: {0}", task); 144 task.send(new byte[0]); 145 break; 146 case "FailTaskSuspend": 147 LOG.log(Level.INFO, "TaskRuntime: Suspend: {0}", task); 148 task.suspend(); 149 break; 150 case "FailTaskStop": 151 case "FailTaskClose": 152 LOG.log(Level.INFO, "TaskRuntime: Stop/Close: {0}", task); 153 task.close(); 154 break; 155 default: 156 break; 157 } 158 } 159 } 160 161 final class ActiveContextHandler implements EventHandler<ActiveContext> { 162 @Override 163 public void onNext(final ActiveContext context) throws DriverSideFailure { 164 throw new DriverSideFailure("Unexpected ActiveContext message: " + context.getId()); 165 } 166 } 167 168 final class StartHandler implements EventHandler<StartTime> { 169 @Override 170 public void onNext(final StartTime time) { 171 LOG.log(Level.INFO, "StartTime: {0}", time); 172 Driver.this.requestor.submit(EvaluatorRequest.newBuilder() 173 .setNumber(1).setMemory(128).setNumberOfCores(1).build()); 174 } 175 } 176}