This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.statepassing;
020
021import org.apache.reef.driver.client.JobMessageObserver;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.context.ServiceConfiguration;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.task.CompletedTask;
027import org.apache.reef.driver.task.TaskConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.JavaConfigurationBuilder;
030import org.apache.reef.tang.Tang;
031import org.apache.reef.tang.annotations.Unit;
032import org.apache.reef.tang.exceptions.BindException;
033import org.apache.reef.wake.EventHandler;
034
035import javax.inject.Inject;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039@Unit
040public class StatePassingDriver {
041
042  private static final Logger LOG = Logger.getLogger(StatePassingDriver.class.getName());
043
044  private static final int PASSES = 2;
045  private final JobMessageObserver client;
046  private int pass = 0;
047
048  @Inject
049  public StatePassingDriver(final JobMessageObserver client) {
050    this.client = client;
051  }
052
053  private static boolean allEqual(final byte value, final byte[] bytes) {
054    for (int i = 0; i < bytes.length; ++i) {
055      if (bytes[i] != value) {
056        return false;
057      }
058    }
059    return true;
060  }
061
062  private void nextPass(final ActiveContext activeContext) {
063    try {
064      activeContext.submitTask(TaskConfiguration.CONF
065          .set(TaskConfiguration.IDENTIFIER, "StatePassing-" + pass)
066          .set(TaskConfiguration.TASK, StatePassingTask.class)
067          .build());
068      ++pass;
069    } catch (final BindException e) {
070      throw new RuntimeException(e);
071    }
072  }
073
074  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
075    @Override
076    public void onNext(final AllocatedEvaluator eb) {
077      final JavaConfigurationBuilder b = Tang.Factory.getTang().newConfigurationBuilder();
078      try {
079        final Configuration contextConfiguration = ContextConfiguration.CONF
080            .set(ContextConfiguration.IDENTIFIER, "StatePassingContext")
081            .build();
082
083        final Configuration serviceConfiguration = ServiceConfiguration.CONF
084            .set(ServiceConfiguration.SERVICES, Counter.class)
085            .build();
086
087        eb.submitContextAndService(contextConfiguration, serviceConfiguration);
088      } catch (final BindException e) {
089        throw new RuntimeException(e);
090      }
091    }
092  }
093
094  final class ContextActiveHandler implements EventHandler<ActiveContext> {
095    @Override
096    public void onNext(final ActiveContext activeContext) {
097      nextPass(activeContext);
098    }
099  }
100
101  final class TaskCompletedHandler implements EventHandler<CompletedTask> {
102    @Override
103    public void onNext(final CompletedTask completed) {
104      LOG.log(Level.INFO, "Received a completed task: " + completed);
105      final byte[] message = completed.get();
106
107      if (message.length != pass) {
108        final String msg = "Expected message of length " + pass + ", but got message of length " + message.length;
109        final RuntimeException ex = new RuntimeException(msg);
110        throw ex;
111      }
112      if (!allEqual((byte) 1, message)) {
113        final RuntimeException ex = new RuntimeException("Did not get the right message");
114        throw ex;
115      }
116
117      if (pass < PASSES) {
118        LOG.log(Level.INFO, "Submitting the next Task");
119        nextPass(completed.getActiveContext());
120      } else {
121        LOG.log(Level.INFO, "Done");
122        completed.getActiveContext().close();
123      }
124    }
125  }
126}