This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.yarn.failure;
020
021import org.apache.reef.driver.context.ContextConfiguration;
022import org.apache.reef.driver.evaluator.AllocatedEvaluator;
023import org.apache.reef.driver.evaluator.EvaluatorRequest;
024import org.apache.reef.driver.evaluator.EvaluatorRequestor;
025import org.apache.reef.driver.evaluator.FailedEvaluator;
026import org.apache.reef.poison.PoisonedConfiguration;
027import org.apache.reef.tang.Tang;
028import org.apache.reef.tang.annotations.Unit;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.time.event.StartTime;
031
032import javax.inject.Inject;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037@Unit
038public class FailureDriver {
039
040  private static final int NUM_EVALUATORS = 40;
041  private static final int NUM_FAILURES = 10;
042  private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES);
043  private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
044  private final EvaluatorRequestor requestor;
045
046  @Inject
047  public FailureDriver(final EvaluatorRequestor requestor) {
048    this.requestor = requestor;
049    LOG.info("Driver instantiated");
050  }
051
052  /**
053   * Handles the StartTime event: Request as single Evaluator.
054   */
055  final class StartHandler implements EventHandler<StartTime> {
056    @Override
057    public void onNext(final StartTime startTime) {
058      LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS);
059      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
060          .setNumber(NUM_EVALUATORS)
061          .setMemory(64)
062          .setNumberOfCores(1)
063          .build());
064    }
065  }
066
067  /**
068   * Handles AllocatedEvaluator: Submit a poisoned context.
069   */
070  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
071    @Override
072    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
073      final String evalId = allocatedEvaluator.getId();
074      LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
075      if (toSubmit.getAndDecrement() > 0) {
076        LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit);
077        allocatedEvaluator.submitContext(
078            Tang.Factory.getTang()
079                .newConfigurationBuilder(
080                    ContextConfiguration.CONF
081                        .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
082                        .build(),
083                    PoisonedConfiguration.CONTEXT_CONF
084                        .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
085                        .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
086                        .build())
087                .build());
088      } else {
089        LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
090        allocatedEvaluator.close();
091      }
092    }
093  }
094
095  /**
096   * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
097   */
098  final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
099    @Override
100    public void onNext(final FailedEvaluator failedEvaluator) {
101      LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
102      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
103          .setNumber(1)
104          .setMemory(64)
105          .setNumberOfCores(1)
106          .build());
107    }
108  }
109}