001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.evaluator.failure; 020 021import org.apache.commons.lang3.Validate; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.*; 024import org.apache.reef.poison.PoisonedConfiguration; 025import org.apache.reef.tang.Tang; 026import org.apache.reef.tang.annotations.Parameter; 027import org.apache.reef.tang.annotations.Unit; 028import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail; 029import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit; 030import org.apache.reef.tests.library.exceptions.DriverSideFailure; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.time.event.StartTime; 033import org.apache.reef.wake.time.event.StopTime; 034 035import javax.inject.Inject; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Driver for failure test. 042 */ 043@Unit 044public class FailureDriver { 045 046 private final int numEvaluatorsToSubmit; 047 private final int numEvaluatorsToFail; 048 private final AtomicInteger numEvaluatorsLeftToSubmit; 049 private final AtomicInteger numEvaluatorsLeftToClose; 050 private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName()); 051 private final EvaluatorRequestor requestor; 052 053 @Inject 054 public FailureDriver(@Parameter(NumEvaluatorsToSubmit.class) final int numEvaluatorsToSubmit, 055 @Parameter(NumEvaluatorsToFail.class) final int numEvaluatorsToFail, 056 final EvaluatorRequestor requestor) { 057 Validate.isTrue(numEvaluatorsToSubmit > 0, "The number of Evaluators to submit must be greater than 0."); 058 Validate.inclusiveBetween(1, numEvaluatorsToSubmit, numEvaluatorsToFail, 059 "The number of Evaluators to fail must be between 1 and numEvaluatorsToSubmit, inclusive."); 060 061 this.numEvaluatorsToSubmit = numEvaluatorsToSubmit; 062 this.numEvaluatorsToFail = numEvaluatorsToFail; 063 064 this.numEvaluatorsLeftToSubmit = new AtomicInteger(numEvaluatorsToSubmit); 065 066 // We should close numEvaluatorsToSubmit because all failed Evaluators are eventually resubmitted and closed. 067 this.numEvaluatorsLeftToClose = new AtomicInteger(numEvaluatorsToSubmit); 068 069 this.requestor = requestor; 070 LOG.info("Driver instantiated"); 071 } 072 073 /** 074 * Handles the StartTime event: Request as single Evaluator. 075 */ 076 final class StartHandler implements EventHandler<StartTime> { 077 @Override 078 public void onNext(final StartTime startTime) { 079 LOG.log(Level.FINE, "Request {0} Evaluators.", numEvaluatorsToSubmit); 080 FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 081 .setNumber(numEvaluatorsToSubmit) 082 .setMemory(64) 083 .setNumberOfCores(1) 084 .build()); 085 } 086 } 087 088 /** 089 * Handles AllocatedEvaluator: Submit a poisoned context. 090 */ 091 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 092 @Override 093 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 094 final String evalId = allocatedEvaluator.getId(); 095 LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId); 096 if (numEvaluatorsLeftToSubmit.getAndDecrement() > 0) { 097 LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", numEvaluatorsLeftToSubmit); 098 allocatedEvaluator.submitContext( 099 Tang.Factory.getTang() 100 .newConfigurationBuilder( 101 ContextConfiguration.CONF 102 .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId) 103 .build(), 104 PoisonedConfiguration.CONTEXT_CONF 105 .set(PoisonedConfiguration.CRASH_PROBABILITY, "1") 106 .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") 107 .build()) 108 .build()); 109 } else { 110 LOG.log(Level.FINE, "Closing evaluator {0}", evalId); 111 allocatedEvaluator.close(); 112 FailureDriver.this.numEvaluatorsLeftToClose.decrementAndGet(); 113 } 114 } 115 } 116 117 /** 118 * Handles FailedEvaluator: Resubmits the single Evaluator resource request. 119 */ 120 final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 121 @Override 122 public void onNext(final FailedEvaluator failedEvaluator) { 123 LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId()); 124 FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 125 .setNumber(1) 126 .setMemory(64) 127 .setNumberOfCores(1) 128 .build()); 129 } 130 } 131 132 /** 133 * Checks whether all failed Evaluators were properly resubmitted and restarted. 134 */ 135 final class StopHandler implements EventHandler<StopTime> { 136 @Override 137 public void onNext(final StopTime stopTime) { 138 final int numEvaluatorsToClose = FailureDriver.this.numEvaluatorsLeftToClose.get(); 139 if (numEvaluatorsToClose != 0){ 140 final String message = "Got RuntimeStop Event. Expected to close " + numEvaluatorsToSubmit + " Evaluators " + 141 "but only " + (numEvaluatorsToSubmit - numEvaluatorsToClose) + " Evaluators were closed."; 142 LOG.log(Level.SEVERE, message); 143 throw new DriverSideFailure(message); 144 } 145 } 146 } 147}