001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.data.output; 020 021import org.apache.reef.driver.context.ActiveContext; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequestor; 025import org.apache.reef.driver.task.TaskConfiguration; 026import org.apache.reef.io.data.output.OutputService; 027import org.apache.reef.tang.Configuration; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.time.event.StartTime; 031 032import javax.inject.Inject; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * The Driver code for the output service demo app. 039 */ 040@Unit 041public final class OutputServiceDriver { 042 private static final Logger LOG = Logger.getLogger(OutputServiceDriver.class.getName()); 043 044 /** 045 * Evaluator requestor object used to create new evaluator containers. 046 */ 047 private final EvaluatorRequestor requestor; 048 049 /** 050 * Output service object. 051 */ 052 private final OutputService outputService; 053 054 /** 055 * Sub-id for Tasks. 056 * This object grants different IDs to each task 057 * e.g. Task-0, Task-1, and so on. 058 */ 059 private final AtomicInteger taskId = new AtomicInteger(0); 060 061 /** 062 * Job driver constructor - instantiated via TANG. 063 * 064 * @param requestor evaluator requestor object used to create new evaluator containers. 065 * @param outputService output service object. 066 */ 067 @Inject 068 public OutputServiceDriver(final EvaluatorRequestor requestor, 069 final OutputService outputService) { 070 LOG.log(Level.FINE, "Instantiated 'OutputServiceDriver'"); 071 this.requestor = requestor; 072 this.outputService = outputService; 073 } 074 075 /** 076 * Handles the StartTime event: Request three Evaluators. 077 */ 078 public final class StartHandler implements EventHandler<StartTime> { 079 @Override 080 public void onNext(final StartTime startTime) { 081 OutputServiceDriver.this.requestor.newRequest() 082 .setNumber(3) 083 .setMemory(64) 084 .setNumberOfCores(1) 085 .submit(); 086 LOG.log(Level.INFO, "Requested Evaluator."); 087 } 088 } 089 090 /** 091 * Handles AllocatedEvaluator: Submit the output service and a context for it. 092 */ 093 public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 094 @Override 095 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 096 LOG.log(Level.INFO, "Submitting Output Service to AllocatedEvaluator: {0}", allocatedEvaluator); 097 final Configuration contextConfiguration = ContextConfiguration.CONF 098 .set(ContextConfiguration.IDENTIFIER, "OutputServiceContext") 099 .build(); 100 allocatedEvaluator.submitContextAndService( 101 contextConfiguration, outputService.getServiceConfiguration()); 102 } 103 } 104 105 /** 106 * Handles ActiveContext: Submit the output service demo task. 107 */ 108 public final class ActiveContextHandler implements EventHandler<ActiveContext> { 109 @Override 110 public void onNext(final ActiveContext activeContext) { 111 LOG.log(Level.INFO, 112 "Submitting OutputServiceREEF task to AllocatedEvaluator: {0}", 113 activeContext.getEvaluatorDescriptor()); 114 final Configuration taskConfiguration = TaskConfiguration.CONF 115 .set(TaskConfiguration.IDENTIFIER, "Task-" + taskId.getAndIncrement()) 116 .set(TaskConfiguration.TASK, OutputServiceTask.class) 117 .build(); 118 activeContext.submitTask(taskConfiguration); 119 } 120 } 121}