This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.output;
020
021import org.apache.reef.driver.context.ActiveContext;
022import org.apache.reef.driver.context.ContextConfiguration;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorRequestor;
025import org.apache.reef.driver.task.TaskConfiguration;
026import org.apache.reef.io.data.output.OutputService;
027import org.apache.reef.tang.Configuration;
028import org.apache.reef.tang.annotations.Unit;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.time.event.StartTime;
031
032import javax.inject.Inject;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * The Driver code for the output service demo app.
039 */
040@Unit
041public final class OutputServiceDriver {
042  private static final Logger LOG = Logger.getLogger(OutputServiceDriver.class.getName());
043
044  /**
045   * Evaluator requestor object used to create new evaluator containers.
046   */
047  private final EvaluatorRequestor requestor;
048
049  /**
050   * Output service object.
051   */
052  private final OutputService outputService;
053
054  /**
055   * Sub-id for Tasks.
056   * This object grants different IDs to each task
057   * e.g. Task-0, Task-1, and so on.
058   */
059  private final AtomicInteger taskId = new AtomicInteger(0);
060
061  /**
062   * Job driver constructor - instantiated via TANG.
063   *
064   * @param requestor evaluator requestor object used to create new evaluator containers.
065   * @param outputService output service object.
066   */
067  @Inject
068  public OutputServiceDriver(final EvaluatorRequestor requestor,
069                             final OutputService outputService) {
070    LOG.log(Level.FINE, "Instantiated 'OutputServiceDriver'");
071    this.requestor = requestor;
072    this.outputService = outputService;
073  }
074
075  /**
076   * Handles the StartTime event: Request three Evaluators.
077   */
078  public final class StartHandler implements EventHandler<StartTime> {
079    @Override
080    public void onNext(final StartTime startTime) {
081      OutputServiceDriver.this.requestor.newRequest()
082          .setNumber(3)
083          .setMemory(64)
084          .setNumberOfCores(1)
085          .submit();
086      LOG.log(Level.INFO, "Requested Evaluator.");
087    }
088  }
089
090  /**
091   * Handles AllocatedEvaluator: Submit the output service and a context for it.
092   */
093  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
094    @Override
095    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
096      LOG.log(Level.INFO, "Submitting Output Service to AllocatedEvaluator: {0}", allocatedEvaluator);
097      final Configuration contextConfiguration = ContextConfiguration.CONF
098          .set(ContextConfiguration.IDENTIFIER, "OutputServiceContext")
099          .build();
100      allocatedEvaluator.submitContextAndService(
101          contextConfiguration, outputService.getServiceConfiguration());
102    }
103  }
104
105  /**
106   * Handles ActiveContext: Submit the output service demo task.
107   */
108  public final class ActiveContextHandler implements EventHandler<ActiveContext> {
109    @Override
110    public void onNext(final ActiveContext activeContext) {
111      LOG.log(Level.INFO,
112          "Submitting OutputServiceREEF task to AllocatedEvaluator: {0}",
113          activeContext.getEvaluatorDescriptor());
114      final Configuration taskConfiguration = TaskConfiguration.CONF
115          .set(TaskConfiguration.IDENTIFIER, "Task-" + taskId.getAndIncrement())
116          .set(TaskConfiguration.TASK, OutputServiceTask.class)
117          .build();
118      activeContext.submitTask(taskConfiguration);
119    }
120  }
121}