001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.yarn.failure; 020 021import org.apache.reef.driver.context.ContextConfiguration; 022import org.apache.reef.driver.evaluator.AllocatedEvaluator; 023import org.apache.reef.driver.evaluator.EvaluatorRequest; 024import org.apache.reef.driver.evaluator.EvaluatorRequestor; 025import org.apache.reef.driver.evaluator.FailedEvaluator; 026import org.apache.reef.poison.PoisonedConfiguration; 027import org.apache.reef.tang.Tang; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.time.event.StartTime; 031 032import javax.inject.Inject; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037@Unit 038public class FailureDriver { 039 040 private static final int NUM_EVALUATORS = 40; 041 private static final int NUM_FAILURES = 10; 042 private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES); 043 private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName()); 044 private final EvaluatorRequestor requestor; 045 046 @Inject 047 public FailureDriver(final EvaluatorRequestor requestor) { 048 this.requestor = requestor; 049 LOG.info("Driver instantiated"); 050 } 051 052 /** 053 * Handles the StartTime event: Request as single Evaluator. 054 */ 055 final class StartHandler implements EventHandler<StartTime> { 056 @Override 057 public void onNext(final StartTime startTime) { 058 LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS); 059 FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 060 .setNumber(NUM_EVALUATORS) 061 .setMemory(64) 062 .setNumberOfCores(1) 063 .build()); 064 } 065 } 066 067 /** 068 * Handles AllocatedEvaluator: Submit a poisoned context. 069 */ 070 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 071 @Override 072 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 073 final String evalId = allocatedEvaluator.getId(); 074 LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId); 075 if (toSubmit.getAndDecrement() > 0) { 076 LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit); 077 allocatedEvaluator.submitContext( 078 Tang.Factory.getTang() 079 .newConfigurationBuilder( 080 ContextConfiguration.CONF 081 .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId) 082 .build(), 083 PoisonedConfiguration.CONTEXT_CONF 084 .set(PoisonedConfiguration.CRASH_PROBABILITY, "1") 085 .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") 086 .build()) 087 .build()); 088 } else { 089 LOG.log(Level.FINE, "Closing evaluator {0}", evalId); 090 allocatedEvaluator.close(); 091 } 092 } 093 } 094 095 /** 096 * Handles FailedEvaluator: Resubmits the single Evaluator resource request. 097 */ 098 final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 099 @Override 100 public void onNext(final FailedEvaluator failedEvaluator) { 101 LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId()); 102 FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 103 .setNumber(1) 104 .setMemory(64) 105 .setNumberOfCores(1) 106 .build()); 107 } 108 } 109}