This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.evaluator.failure;
020
021import org.apache.commons.lang3.Validate;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.*;
024import org.apache.reef.poison.PoisonedConfiguration;
025import org.apache.reef.tang.Tang;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.tang.annotations.Unit;
028import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail;
029import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit;
030import org.apache.reef.tests.library.exceptions.DriverSideFailure;
031import org.apache.reef.wake.EventHandler;
032import org.apache.reef.wake.time.event.StartTime;
033import org.apache.reef.wake.time.event.StopTime;
034
035import javax.inject.Inject;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Driver for failure test.
042 */
043@Unit
044public class FailureDriver {
045
046  private final int numEvaluatorsToSubmit;
047  private final int numEvaluatorsToFail;
048  private final AtomicInteger numEvaluatorsLeftToSubmit;
049  private final AtomicInteger numEvaluatorsLeftToClose;
050  private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
051  private final EvaluatorRequestor requestor;
052
053  @Inject
054  public FailureDriver(@Parameter(NumEvaluatorsToSubmit.class) final int numEvaluatorsToSubmit,
055                       @Parameter(NumEvaluatorsToFail.class) final int numEvaluatorsToFail,
056                       final EvaluatorRequestor requestor) {
057    Validate.isTrue(numEvaluatorsToSubmit > 0, "The number of Evaluators to submit must be greater than 0.");
058    Validate.inclusiveBetween(1, numEvaluatorsToSubmit, numEvaluatorsToFail,
059        "The number of Evaluators to fail must be between 1 and numEvaluatorsToSubmit, inclusive.");
060
061    this.numEvaluatorsToSubmit = numEvaluatorsToSubmit;
062    this.numEvaluatorsToFail = numEvaluatorsToFail;
063
064    this.numEvaluatorsLeftToSubmit = new AtomicInteger(numEvaluatorsToSubmit);
065
066    // We should close numEvaluatorsToSubmit because all failed Evaluators are eventually resubmitted and closed.
067    this.numEvaluatorsLeftToClose = new AtomicInteger(numEvaluatorsToSubmit);
068
069    this.requestor = requestor;
070    LOG.info("Driver instantiated");
071  }
072
073  /**
074   * Handles the StartTime event: Request as single Evaluator.
075   */
076  final class StartHandler implements EventHandler<StartTime> {
077    @Override
078    public void onNext(final StartTime startTime) {
079      LOG.log(Level.FINE, "Request {0} Evaluators.", numEvaluatorsToSubmit);
080      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
081          .setNumber(numEvaluatorsToSubmit)
082          .setMemory(64)
083          .setNumberOfCores(1)
084          .build());
085    }
086  }
087
088  /**
089   * Handles AllocatedEvaluator: Submit a poisoned context.
090   */
091  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
092    @Override
093    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
094      final String evalId = allocatedEvaluator.getId();
095      LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
096      if (numEvaluatorsLeftToSubmit.getAndDecrement() > 0) {
097        LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", numEvaluatorsLeftToSubmit);
098        allocatedEvaluator.submitContext(
099            Tang.Factory.getTang()
100                .newConfigurationBuilder(
101                    ContextConfiguration.CONF
102                        .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
103                        .build(),
104                    PoisonedConfiguration.CONTEXT_CONF
105                        .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
106                        .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
107                        .build())
108                .build());
109      } else {
110        LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
111        allocatedEvaluator.close();
112        FailureDriver.this.numEvaluatorsLeftToClose.decrementAndGet();
113      }
114    }
115  }
116
117  /**
118   * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
119   */
120  final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
121    @Override
122    public void onNext(final FailedEvaluator failedEvaluator) {
123      LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
124      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
125          .setNumber(1)
126          .setMemory(64)
127          .setNumberOfCores(1)
128          .build());
129    }
130  }
131
132  /**
133   * Checks whether all failed Evaluators were properly resubmitted and restarted.
134   */
135  final class StopHandler implements EventHandler<StopTime> {
136    @Override
137    public void onNext(final StopTime stopTime) {
138      final int numEvaluatorsToClose = FailureDriver.this.numEvaluatorsLeftToClose.get();
139      if (numEvaluatorsToClose != 0){
140        final String message = "Got RuntimeStop Event. Expected to close " + numEvaluatorsToSubmit + " Evaluators " +
141            "but only " + (numEvaluatorsToSubmit - numEvaluatorsToClose) + " Evaluators were closed.";
142        LOG.log(Level.SEVERE, message);
143        throw new DriverSideFailure(message);
144      }
145    }
146  }
147}