001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.yarn.failure; 020 021import org.apache.reef.driver.context.ContextConfiguration; 022import org.apache.reef.driver.evaluator.AllocatedEvaluator; 023import org.apache.reef.driver.evaluator.EvaluatorRequest; 024import org.apache.reef.driver.evaluator.EvaluatorRequestor; 025import org.apache.reef.driver.evaluator.FailedEvaluator; 026import org.apache.reef.poison.PoisonedConfiguration; 027import org.apache.reef.tang.Tang; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.time.event.StartTime; 031 032import javax.inject.Inject; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * Driver for failure test. 039 */ 040@Unit 041public class FailureDriver { 042 043 private static final int NUM_EVALUATORS = 40; 044 private static final int NUM_FAILURES = 10; 045 private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES); 046 private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName()); 047 private final EvaluatorRequestor requestor; 048 049 @Inject 050 public FailureDriver(final EvaluatorRequestor requestor) { 051 this.requestor = requestor; 052 LOG.info("Driver instantiated"); 053 } 054 055 /** 056 * Handles the StartTime event: Request as single Evaluator. 057 */ 058 final class StartHandler implements EventHandler<StartTime> { 059 @Override 060 public void onNext(final StartTime startTime) { 061 LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS); 062 FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 063 .setNumber(NUM_EVALUATORS) 064 .setMemory(64) 065 .setNumberOfCores(1) 066 .build()); 067 } 068 } 069 070 /** 071 * Handles AllocatedEvaluator: Submit a poisoned context. 072 */ 073 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 074 @Override 075 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 076 final String evalId = allocatedEvaluator.getId(); 077 LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId); 078 if (toSubmit.getAndDecrement() > 0) { 079 LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit); 080 allocatedEvaluator.submitContext( 081 Tang.Factory.getTang() 082 .newConfigurationBuilder( 083 ContextConfiguration.CONF 084 .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId) 085 .build(), 086 PoisonedConfiguration.CONTEXT_CONF 087 .set(PoisonedConfiguration.CRASH_PROBABILITY, "1") 088 .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") 089 .build()) 090 .build()); 091 } else { 092 LOG.log(Level.FINE, "Closing evaluator {0}", evalId); 093 allocatedEvaluator.close(); 094 } 095 } 096 } 097 098 /** 099 * Handles FailedEvaluator: Resubmits the single Evaluator resource request. 100 */ 101 final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 102 @Override 103 public void onNext(final FailedEvaluator failedEvaluator) { 104 LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId()); 105 FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 106 .setNumber(1) 107 .setMemory(64) 108 .setNumberOfCores(1) 109 .build()); 110 } 111 } 112}