001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.driver.evaluator.FailedEvaluator; 022import org.apache.reef.driver.parameters.EvaluatorDispatcherThreads; 023import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers; 024import org.apache.reef.driver.parameters.ServiceTaskFailedHandlers; 025import org.apache.reef.driver.parameters.TaskRunningHandlers; 026import org.apache.reef.driver.task.FailedTask; 027import org.apache.reef.driver.task.RunningTask; 028import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver; 029import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOut; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.JavaConfigurationBuilder; 032import org.apache.reef.tang.Tang; 033import org.apache.reef.tang.annotations.Unit; 034import org.apache.reef.tang.formats.AvroConfigurationSerializer; 035import org.apache.reef.tang.formats.ConfigurationSerializer; 036import org.apache.reef.wake.EventHandler; 037 038import javax.inject.Inject; 039import java.util.logging.Logger; 040 041/** 042 * The Group Communication Service. 043 */ 044@SuppressWarnings("checkstyle:hideutilityclassconstructor") 045@Unit 046public class GroupCommService { 047 048 private static final Logger LOG = Logger.getLogger(GroupCommService.class.getName()); 049 private static final ConfigurationSerializer CONF_SER = new AvroConfigurationSerializer(); 050 051 private final GroupCommServiceDriver groupCommDriver; 052 053 @Inject 054 public GroupCommService(final GroupCommServiceDriver groupCommDriver) { 055 this.groupCommDriver = groupCommDriver; 056 } 057 058 public static Configuration getConfiguration() { 059 LOG.entering("GroupCommService", "getConfiguration"); 060 final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); 061 jcb.bindSetEntry(TaskRunningHandlers.class, RunningTaskHandler.class); 062 jcb.bindSetEntry(ServiceTaskFailedHandlers.class, FailedTaskHandler.class); 063 jcb.bindSetEntry(ServiceEvaluatorFailedHandlers.class, FailedEvaluatorHandler.class); 064 jcb.bindNamedParameter(EvaluatorDispatcherThreads.class, "1"); 065 final Configuration retVal = jcb.build(); 066 LOG.exiting("GroupCommService", "getConfiguration", CONF_SER.toString(retVal)); 067 return retVal; 068 } 069 070 public static Configuration getConfiguration(final int fanOut) { 071 LOG.entering("GroupCommService", "getConfiguration", fanOut); 072 final Configuration baseConf = getConfiguration(); 073 final Configuration retConf = Tang.Factory.getTang().newConfigurationBuilder(baseConf) 074 .bindNamedParameter(TreeTopologyFanOut.class, Integer.toString(fanOut)).build(); 075 LOG.exiting("GroupCommService", "getConfiguration", CONF_SER.toString(retConf)); 076 return retConf; 077 } 078 079 public class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { 080 081 @Override 082 public void onNext(final FailedEvaluator failedEvaluator) { 083 LOG.entering("GroupCommService.FailedEvaluatorHandler", "onNext", failedEvaluator.getId()); 084 groupCommDriver.getGroupCommFailedEvaluatorStage().onNext(failedEvaluator); 085 LOG.exiting("GroupCommService.FailedEvaluatorHandler", "onNext", failedEvaluator.getId()); 086 } 087 088 } 089 090 public class RunningTaskHandler implements EventHandler<RunningTask> { 091 092 @Override 093 public void onNext(final RunningTask runningTask) { 094 LOG.entering("GroupCommService.RunningTaskHandler", "onNext", runningTask.getId()); 095 groupCommDriver.getGroupCommRunningTaskStage().onNext(runningTask); 096 LOG.exiting("GroupCommService.RunningTaskHandler", "onNext", runningTask.getId()); 097 } 098 099 } 100 101 public class FailedTaskHandler implements EventHandler<FailedTask> { 102 103 @Override 104 public void onNext(final FailedTask failedTask) { 105 LOG.entering("GroupCommService.FailedTaskHandler", "onNext", failedTask.getId()); 106 groupCommDriver.getGroupCommFailedTaskStage().onNext(failedTask); 107 LOG.exiting("GroupCommService.FailedTaskHandler", "onNext", failedTask.getId()); 108 } 109 110 } 111 112}