001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.driver;
020
021import org.apache.reef.driver.evaluator.FailedEvaluator;
022import org.apache.reef.driver.parameters.DriverIdentifier;
023import org.apache.reef.driver.task.FailedTask;
024import org.apache.reef.driver.task.RunningTask;
025import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
026import org.apache.reef.io.network.group.api.driver.Topology;
027import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
028import org.apache.reef.io.network.group.impl.config.parameters.*;
029import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler;
030import org.apache.reef.tang.Injector;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.annotations.Name;
033import org.apache.reef.tang.annotations.Parameter;
034import org.apache.reef.tang.exceptions.InjectionException;
035import org.apache.reef.wake.EStage;
036
037import javax.inject.Inject;
038
039/**
040 * A factory used to create CommunicationGroupDriver instance.
041 * Uses Tang to instantiate new object.
042 */
043public final class CommunicationGroupDriverFactory {
044
045  private final Injector injector;
046
047  @Inject
048  private CommunicationGroupDriverFactory(
049      @Parameter(DriverIdentifier.class) final String driverId,
050      @Parameter(GroupCommSenderStage.class) final EStage<GroupCommunicationMessage> senderStage,
051      @Parameter(GroupCommRunningTaskHandler.class)
052          final BroadcastingEventHandler<RunningTask> groupCommRunningTaskHandler,
053      @Parameter(GroupCommFailedTaskHandler.class)
054          final BroadcastingEventHandler<FailedTask> groupCommFailedTaskHandler,
055      @Parameter(GroupCommFailedEvalHandler.class)
056          final BroadcastingEventHandler<FailedEvaluator> groupCommFailedEvaluatorHandler,
057      final GroupCommMessageHandler groupCommMessageHandler) {
058    injector = Tang.Factory.getTang().newInjector();
059    injector.bindVolatileParameter(GroupCommSenderStage.class, senderStage);
060    injector.bindVolatileParameter(DriverIdentifier.class, driverId);
061    injector.bindVolatileParameter(GroupCommRunningTaskHandler.class, groupCommRunningTaskHandler);
062    injector.bindVolatileParameter(GroupCommFailedTaskHandler.class, groupCommFailedTaskHandler);
063    injector.bindVolatileParameter(GroupCommFailedEvalHandler.class, groupCommFailedEvaluatorHandler);
064    injector.bindVolatileInstance(GroupCommMessageHandler.class, groupCommMessageHandler);
065  }
066
067  /**
068   * Instantiates a new CommunicationGroupDriver instance.
069   * @param groupName specified name of the communication group
070   * @param topologyClass topology implementation
071   * @param numberOfTasks minimum number of tasks needed in this group before start
072   * @param customFanOut fanOut for TreeTopology
073   * @return CommunicationGroupDriver instance
074   * @throws InjectionException
075   */
076  public CommunicationGroupDriver getNewInstance(
077      final Class<? extends Name<String>> groupName,
078      final Class<? extends Topology> topologyClass,
079      final int numberOfTasks,
080      final int customFanOut) throws InjectionException {
081
082    final Injector newInjector = injector.forkInjector();
083    newInjector.bindVolatileParameter(CommGroupNameClass.class, groupName);
084    newInjector.bindVolatileParameter(TopologyClass.class, topologyClass);
085    newInjector.bindVolatileParameter(CommGroupNumTask.class, numberOfTasks);
086    newInjector.bindVolatileParameter(TreeTopologyFanOut.class, customFanOut);
087    return newInjector.getInstance(CommunicationGroupDriver.class);
088  }
089}