This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.driver.evaluator.FailedEvaluator;
022import org.apache.reef.driver.parameters.EvaluatorDispatcherThreads;
023import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
024import org.apache.reef.driver.parameters.ServiceTaskFailedHandlers;
025import org.apache.reef.driver.parameters.TaskRunningHandlers;
026import org.apache.reef.driver.task.FailedTask;
027import org.apache.reef.driver.task.RunningTask;
028import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver;
029import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOut;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.JavaConfigurationBuilder;
032import org.apache.reef.tang.Tang;
033import org.apache.reef.tang.annotations.Unit;
034import org.apache.reef.tang.formats.AvroConfigurationSerializer;
035import org.apache.reef.tang.formats.ConfigurationSerializer;
036import org.apache.reef.wake.EventHandler;
037
038import javax.inject.Inject;
039import java.util.logging.Logger;
040
041/**
042 * The Group Communication Service.
043 */
044@SuppressWarnings("checkstyle:hideutilityclassconstructor")
045@Unit
046public class GroupCommService {
047
048  private static final Logger LOG = Logger.getLogger(GroupCommService.class.getName());
049  private static final ConfigurationSerializer CONF_SER = new AvroConfigurationSerializer();
050
051  private final GroupCommServiceDriver groupCommDriver;
052
053  @Inject
054  public GroupCommService(final GroupCommServiceDriver groupCommDriver) {
055    this.groupCommDriver = groupCommDriver;
056  }
057
058  public static Configuration getConfiguration() {
059    LOG.entering("GroupCommService", "getConfiguration");
060    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
061    jcb.bindSetEntry(TaskRunningHandlers.class, RunningTaskHandler.class);
062    jcb.bindSetEntry(ServiceTaskFailedHandlers.class, FailedTaskHandler.class);
063    jcb.bindSetEntry(ServiceEvaluatorFailedHandlers.class, FailedEvaluatorHandler.class);
064    jcb.bindNamedParameter(EvaluatorDispatcherThreads.class, "1");
065    final Configuration retVal = jcb.build();
066    LOG.exiting("GroupCommService", "getConfiguration", CONF_SER.toString(retVal));
067    return retVal;
068  }
069
070  public static Configuration getConfiguration(final int fanOut) {
071    LOG.entering("GroupCommService", "getConfiguration", fanOut);
072    final Configuration baseConf = getConfiguration();
073    final Configuration retConf = Tang.Factory.getTang().newConfigurationBuilder(baseConf)
074        .bindNamedParameter(TreeTopologyFanOut.class, Integer.toString(fanOut)).build();
075    LOG.exiting("GroupCommService", "getConfiguration", CONF_SER.toString(retConf));
076    return retConf;
077  }
078
079  public class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
080
081    @Override
082    public void onNext(final FailedEvaluator failedEvaluator) {
083      LOG.entering("GroupCommService.FailedEvaluatorHandler", "onNext", failedEvaluator.getId());
084      groupCommDriver.getGroupCommFailedEvaluatorStage().onNext(failedEvaluator);
085      LOG.exiting("GroupCommService.FailedEvaluatorHandler", "onNext", failedEvaluator.getId());
086    }
087
088  }
089
090  public class RunningTaskHandler implements EventHandler<RunningTask> {
091
092    @Override
093    public void onNext(final RunningTask runningTask) {
094      LOG.entering("GroupCommService.RunningTaskHandler", "onNext", runningTask.getId());
095      groupCommDriver.getGroupCommRunningTaskStage().onNext(runningTask);
096      LOG.exiting("GroupCommService.RunningTaskHandler", "onNext", runningTask.getId());
097    }
098
099  }
100
101  public class FailedTaskHandler implements EventHandler<FailedTask> {
102
103    @Override
104    public void onNext(final FailedTask failedTask) {
105      LOG.entering("GroupCommService.FailedTaskHandler", "onNext", failedTask.getId());
106      groupCommDriver.getGroupCommFailedTaskStage().onNext(failedTask);
107      LOG.exiting("GroupCommService.FailedTaskHandler", "onNext", failedTask.getId());
108    }
109
110  }
111
112}