001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.watcher; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.context.FailedContext; 024import org.apache.reef.driver.evaluator.AllocatedEvaluator; 025import org.apache.reef.driver.evaluator.EvaluatorRequest; 026import org.apache.reef.driver.evaluator.EvaluatorRequestor; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.driver.task.FailedTask; 029import org.apache.reef.driver.task.RunningTask; 030import org.apache.reef.driver.task.SuspendedTask; 031import org.apache.reef.driver.task.TaskConfiguration; 032import org.apache.reef.tang.Configuration; 033import org.apache.reef.tang.Tang; 034import org.apache.reef.tang.annotations.Unit; 035import org.apache.reef.wake.EventHandler; 036import org.apache.reef.wake.time.event.StartTime; 037import org.apache.reef.wake.time.runtime.event.RuntimeStop; 038 039import javax.inject.Inject; 040import java.util.concurrent.atomic.AtomicBoolean; 041 042/** 043 * Driver for WatcherTest. 044 */ 045@Unit 046public final class WatcherTestDriver { 047 048 private static final String ROOT_CONTEXT_ID = "ROOT_CONTEXT"; 049 private static final String FIRST_CONTEXT_ID = "FIRST_CONTEXT"; 050 051 private final EvaluatorRequestor evaluatorRequestor; 052 private final TestEventStream testEventStream; 053 054 /** 055 * The first evaluator will be failed to generate FailedEvaluator. 056 */ 057 private final AtomicBoolean isFirstEvaluator; 058 059 /** 060 * The first task will be suspended to generate SuspendedTask. 061 */ 062 private final AtomicBoolean isFirstTask; 063 064 @Inject 065 private WatcherTestDriver(final EvaluatorRequestor evaluatorRequestor, 066 final TestEventStream testEventStream) { 067 this.evaluatorRequestor = evaluatorRequestor; 068 this.testEventStream = testEventStream; 069 this.isFirstEvaluator = new AtomicBoolean(true); 070 this.isFirstTask = new AtomicBoolean(true); 071 } 072 073 /** 074 * Handler for StartTime. 075 */ 076 public final class DriverStartedHandler implements EventHandler<StartTime> { 077 078 @Override 079 public void onNext(final StartTime value) { 080 evaluatorRequestor.submit(EvaluatorRequest 081 .newBuilder() 082 .setMemory(64) 083 .setNumberOfCores(1) 084 .setNumber(2) 085 .build()); 086 } 087 } 088 089 /** 090 * Handler for AllocatedEvaluator. 091 */ 092 public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 093 094 @Override 095 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 096 if (isFirstEvaluator.compareAndSet(true, false)) { 097 allocatedEvaluator.submitContext(getFailedContextConfiguration()); 098 } else { 099 allocatedEvaluator.submitContext(ContextConfiguration.CONF 100 .set(ContextConfiguration.IDENTIFIER, ROOT_CONTEXT_ID) 101 .build()); 102 } 103 } 104 } 105 106 /** 107 * Handler for FailedEvaluator. 108 */ 109 public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 110 111 @Override 112 public void onNext(final FailedEvaluator failedEvaluator) { 113 // no-op 114 } 115 } 116 117 /** 118 * Handler for ActiveContext. 119 */ 120 public final class ContextActivatedHandler implements EventHandler<ActiveContext> { 121 122 @Override 123 public void onNext(final ActiveContext activeContext) { 124 if (activeContext.getId().equals(ROOT_CONTEXT_ID)) { 125 activeContext.submitContext(ContextConfiguration.CONF 126 .set(ContextConfiguration.IDENTIFIER, FIRST_CONTEXT_ID) 127 .build()); 128 } else if (activeContext.getId().equals(FIRST_CONTEXT_ID)) { 129 activeContext.submitContext(getFailedContextConfiguration()); 130 } 131 } 132 } 133 134 /** 135 * Handler for FailedContext. 136 */ 137 public final class ContextFailedHandler implements EventHandler<FailedContext> { 138 139 @Override 140 public void onNext(final FailedContext failedContext) { 141 failedContext.getParentContext().get().submitTask(getFailedTaskConfiguration()); 142 } 143 } 144 145 /** 146 * Handler for RunningTask. 147 */ 148 public final class TaskRunningHandler implements EventHandler<RunningTask> { 149 150 @Override 151 public void onNext(final RunningTask runningTask) { 152 if (isFirstTask.compareAndSet(true, false)) { 153 runningTask.suspend(); 154 } 155 } 156 } 157 158 /** 159 * Handler for FailedTask. 160 */ 161 public final class TaskFailedHandler implements EventHandler<FailedTask> { 162 163 @Override 164 public void onNext(final FailedTask failedTask) { 165 failedTask.getActiveContext().get().submitTask(getTaskConfiguration(true)); 166 } 167 } 168 169 /** 170 * Handler for SuspendedTask. 171 */ 172 public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> { 173 174 @Override 175 public void onNext(final SuspendedTask value) { 176 value.getActiveContext().submitTask(getTaskConfiguration(false)); 177 } 178 } 179 180 /** 181 * Handler for RuntimeStop. 182 */ 183 public final class RuntimeStopHandler implements EventHandler<RuntimeStop> { 184 185 @Override 186 public void onNext(final RuntimeStop runtimeStop) { 187 testEventStream.validate(); 188 } 189 } 190 191 private Configuration getTaskConfiguration(final boolean isTaskSuspended) { 192 final Configuration taskConf = TaskConfiguration.CONF 193 .set(TaskConfiguration.TASK, WatcherTestTask.class) 194 .set(TaskConfiguration.IDENTIFIER, "TASK") 195 .set(TaskConfiguration.ON_SEND_MESSAGE, WatcherTestTask.class) 196 .set(TaskConfiguration.ON_SUSPEND, WatcherTestTask.TaskSuspendedHandler.class) 197 .build(); 198 199 return Tang.Factory.getTang().newConfigurationBuilder(taskConf) 200 .bindNamedParameter(IsTaskSuspended.class, String.valueOf(isTaskSuspended)) 201 .build(); 202 } 203 204 private Configuration getFailedTaskConfiguration() { 205 return TaskConfiguration.CONF 206 .set(TaskConfiguration.TASK, WatcherTestTask.class) 207 .set(TaskConfiguration.IDENTIFIER, "FAILED_TASK") 208 .set(TaskConfiguration.ON_TASK_STARTED, FailedTaskStartHandler.class) 209 .build(); 210 } 211 212 private Configuration getFailedContextConfiguration() { 213 return ContextConfiguration.CONF 214 .set(ContextConfiguration.IDENTIFIER, "FAILED_CONTEXT") 215 .set(ContextConfiguration.ON_CONTEXT_STARTED, FailedContextHandler.class) 216 .build(); 217 } 218}