This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.yarn.failure;
020
021import org.apache.reef.driver.context.ContextConfiguration;
022import org.apache.reef.driver.evaluator.AllocatedEvaluator;
023import org.apache.reef.driver.evaluator.EvaluatorRequest;
024import org.apache.reef.driver.evaluator.EvaluatorRequestor;
025import org.apache.reef.driver.evaluator.FailedEvaluator;
026import org.apache.reef.poison.PoisonedConfiguration;
027import org.apache.reef.tang.Tang;
028import org.apache.reef.tang.annotations.Unit;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.time.event.StartTime;
031
032import javax.inject.Inject;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * Driver for failure test.
039 */
040@Unit
041public class FailureDriver {
042
043  private static final int NUM_EVALUATORS = 40;
044  private static final int NUM_FAILURES = 10;
045  private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES);
046  private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
047  private final EvaluatorRequestor requestor;
048
049  @Inject
050  public FailureDriver(final EvaluatorRequestor requestor) {
051    this.requestor = requestor;
052    LOG.info("Driver instantiated");
053  }
054
055  /**
056   * Handles the StartTime event: Request as single Evaluator.
057   */
058  final class StartHandler implements EventHandler<StartTime> {
059    @Override
060    public void onNext(final StartTime startTime) {
061      LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS);
062      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
063          .setNumber(NUM_EVALUATORS)
064          .setMemory(64)
065          .setNumberOfCores(1)
066          .build());
067    }
068  }
069
070  /**
071   * Handles AllocatedEvaluator: Submit a poisoned context.
072   */
073  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
074    @Override
075    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
076      final String evalId = allocatedEvaluator.getId();
077      LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
078      if (toSubmit.getAndDecrement() > 0) {
079        LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit);
080        allocatedEvaluator.submitContext(
081            Tang.Factory.getTang()
082                .newConfigurationBuilder(
083                    ContextConfiguration.CONF
084                        .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
085                        .build(),
086                    PoisonedConfiguration.CONTEXT_CONF
087                        .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
088                        .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
089                        .build())
090                .build());
091      } else {
092        LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
093        allocatedEvaluator.close();
094      }
095    }
096  }
097
098  /**
099   * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
100   */
101  final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
102    @Override
103    public void onNext(final FailedEvaluator failedEvaluator) {
104      LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
105      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
106          .setNumber(1)
107          .setMemory(64)
108          .setNumberOfCores(1)
109          .build());
110    }
111  }
112}