001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.network.group.impl.driver; 020 021import org.apache.reef.driver.evaluator.FailedEvaluator; 022import org.apache.reef.driver.parameters.DriverIdentifier; 023import org.apache.reef.driver.task.FailedTask; 024import org.apache.reef.driver.task.RunningTask; 025import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver; 026import org.apache.reef.io.network.group.api.driver.Topology; 027import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; 028import org.apache.reef.io.network.group.impl.config.parameters.*; 029import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler; 030import org.apache.reef.tang.Injector; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.annotations.Name; 033import org.apache.reef.tang.annotations.Parameter; 034import org.apache.reef.tang.exceptions.InjectionException; 035import org.apache.reef.wake.EStage; 036 037import javax.inject.Inject; 038 039/** 040 * A factory used to create CommunicationGroupDriver instance. 041 * Uses Tang to instantiate new object. 042 */ 043public final class CommunicationGroupDriverFactory { 044 045 private final Injector injector; 046 047 @Inject 048 private CommunicationGroupDriverFactory( 049 @Parameter(DriverIdentifier.class) final String driverId, 050 @Parameter(GroupCommSenderStage.class) final EStage<GroupCommunicationMessage> senderStage, 051 @Parameter(GroupCommRunningTaskHandler.class) 052 final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler, 053 @Parameter(GroupCommFailedTaskHandler.class) 054 final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler, 055 @Parameter(GroupCommFailedEvalHandler.class) 056 final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler, 057 final GroupCommMessageHandler groupCommMessageHandler) { 058 injector = Tang.Factory.getTang().newInjector(); 059 injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage); 060 injector.bindVolatileParameter(DriverIdentifier.class, driverId); 061 injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler); 062 injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler); 063 injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler); 064 injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler); 065 } 066 067 /** 068 * Instantiates a new CommunicationGroupDriver instance. 069 * @param groupName specified name of the communication group 070 * @param topologyClass topology implementation 071 * @param numberOfTasks minimum number of tasks needed in this group before start 072 * @param customFanOut fanOut for TreeTopology 073 * @return CommunicationGroupDriver instance 074 * @throws InjectionException 075 */ 076 public CommunicationGroupDriver getNewInstance( 077 final Class<? extends Name<String>> groupName, 078 final Class<? extends Topology> topologyClass, 079 final int numberOfTasks, 080 final int customFanOut) throws InjectionException { 081 082 final Injector newInjector = injector.forkInjector(); 083 newInjector.bindVolatileParameter(CommGroupNameClass.class, groupName); 084 newInjector.bindVolatileParameter(TopologyClass.class, topologyClass); 085 newInjector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks); 086 newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut); 087 return newInjector.getInstance(CommunicationGroupDriver.class); 088 } 089}