This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.statepassing;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.context.ServiceConfiguration;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.task.CompletedTask;
027import org.apache.reef.driver.task.TaskConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.annotations.Unit;
030import org.apache.reef.tang.exceptions.BindException;
031import org.apache.reef.wake.EventHandler;
032
033import javax.inject.Inject;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * Driver for StatePassingTest.
039 */
040@Unit
041public class StatePassingDriver {
042
043  private static final Logger LOG = Logger.getLogger(StatePassingDriver.class.getName());
044
045  private static final int PASSES = 2;
046  private final JobMessageObserver client;
047  private int pass = 0;
048
049  @Inject
050  public StatePassingDriver(final JobMessageObserver client) {
051    this.client = client;
052  }
053
054  private static boolean allEqual(final byte value, final byte[] bytes) {
055    for (int i = 0; i < bytes.length; ++i) {
056      if (bytes[i] != value) {
057        return false;
058      }
059    }
060    return true;
061  }
062
063  private void nextPass(final ActiveContext activeContext) {
064    try {
065      activeContext.submitTask(TaskConfiguration.CONF
066          .set(TaskConfiguration.IDENTIFIER, "StatePassing-" + pass)
067          .set(TaskConfiguration.TASK, StatePassingTask.class)
068          .build());
069      ++pass;
070    } catch (final BindException e) {
071      throw new RuntimeException(e);
072    }
073  }
074
075  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
076    @Override
077    public void onNext(final AllocatedEvaluator eb) {
078      try {
079        final Configuration contextConfiguration = ContextConfiguration.CONF
080            .set(ContextConfiguration.IDENTIFIER, "StatePassingContext")
081            .build();
082
083        final Configuration serviceConfiguration = ServiceConfiguration.CONF
084            .set(ServiceConfiguration.SERVICES, Counter.class)
085            .build();
086
087        eb.submitContextAndService(contextConfiguration, serviceConfiguration);
088      } catch (final BindException e) {
089        throw new RuntimeException(e);
090      }
091    }
092  }
093
094  final class ContextActiveHandler implements EventHandler<ActiveContext> {
095    @Override
096    public void onNext(final ActiveContext activeContext) {
097      nextPass(activeContext);
098    }
099  }
100
101  final class TaskCompletedHandler implements EventHandler<CompletedTask> {
102    @Override
103    public void onNext(final CompletedTask completed) {
104      LOG.log(Level.INFO, "Received a completed task: " + completed);
105      final byte[] message = completed.get();
106
107      if (message.length != pass) {
108        final String msg = "Expected message of length " + pass + ", but got message of length " + message.length;
109        final RuntimeException ex = new RuntimeException(msg);
110        throw ex;
111      }
112      if (!allEqual((byte) 1, message)) {
113        final RuntimeException ex = new RuntimeException("Did not get the right message");
114        throw ex;
115      }
116
117      if (pass < PASSES) {
118        LOG.log(Level.INFO, "Submitting the next Task");
119        nextPass(completed.getActiveContext());
120      } else {
121        LOG.log(Level.INFO, "Done");
122        completed.getActiveContext().close();
123      }
124    }
125  }
126}