001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.output; 020 021import org.apache.reef.driver.context.ServiceConfiguration; 022import org.apache.reef.evaluator.context.events.ContextStop; 023import org.apache.reef.tang.Configuration; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Name; 026import org.apache.reef.tang.annotations.NamedParameter; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.task.events.TaskStart; 030import org.apache.reef.wake.EventHandler; 031 032import javax.inject.Inject; 033import java.io.IOException; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * A service class of the task output service. 039 * The file output service provides an output stream, 040 * through which tasks write their output to a file 041 * without considering the current runtime 042 * and collision with other tasks. 043 */ 044@Unit 045public final class TaskOutputService implements OutputService { 046 private static final Logger LOG = Logger.getLogger(TaskOutputService.class.getName()); 047 048 /** 049 * Output stream provider object through which tasks create output streams. 050 */ 051 private final TaskOutputStreamProvider taskOutputStreamProvider; 052 053 /** 054 * Path of the directory where output files are created. 055 */ 056 private final String outputPath; 057 058 /** 059 * Service constructor - instantiated via TANG. 060 * 061 * @param taskOutputStreamProvider Output stream provider object through which tasks create file output streams. 062 * @param outputPath Path of the directory where output files are created. 063 */ 064 @Inject 065 private TaskOutputService( 066 final TaskOutputStreamProvider taskOutputStreamProvider, 067 @Parameter(OutputPath.class) final String outputPath) { 068 this.taskOutputStreamProvider = taskOutputStreamProvider; 069 this.outputPath = outputPath; 070 } 071 072 /** 073 * Provides a service configuration for the output service. 074 * 075 * @return service configuration. 076 */ 077 @Override 078 public Configuration getServiceConfiguration() { 079 080 final Configuration partialServiceConf = ServiceConfiguration.CONF 081 .set(ServiceConfiguration.SERVICES, taskOutputStreamProvider.getClass()) 082 .set(ServiceConfiguration.ON_CONTEXT_STOP, ContextStopHandler.class) 083 .set(ServiceConfiguration.ON_TASK_STARTED, TaskStartHandler.class) 084 .build(); 085 086 return Tang.Factory.getTang() 087 .newConfigurationBuilder(partialServiceConf) 088 .bindImplementation(OutputStreamProvider.class, taskOutputStreamProvider.getClass()) 089 .bindImplementation(TaskOutputStreamProvider.class, taskOutputStreamProvider.getClass()) 090 .bindNamedParameter(OutputPath.class, outputPath) 091 .build(); 092 } 093 094 /** 095 * Handles the ContextStop event: Close the output stream provider. 096 */ 097 private final class ContextStopHandler implements EventHandler<ContextStop> { 098 @Override 099 public void onNext(final ContextStop contextStop) { 100 LOG.log(Level.INFO, "Context stopped, close the OutputStreamProvider."); 101 try { 102 taskOutputStreamProvider.close(); 103 } catch (final IOException e) { 104 throw new RuntimeException(e); 105 } 106 } 107 } 108 109 /** 110 * Handles the TaskStart event: Set the task id to the output stream provider. 111 */ 112 private final class TaskStartHandler implements EventHandler<TaskStart> { 113 @Override 114 public void onNext(final TaskStart taskStart) { 115 LOG.log(Level.INFO, String.format("Task %s started, create the OutputStreamProvider.", taskStart.getId())); 116 taskOutputStreamProvider.setTaskId(taskStart.getId()); 117 } 118 } 119 120 /** 121 * Path of the directory where output files are created. 122 */ 123 @NamedParameter(doc = "Path of the directory where output files are created") 124 public static final class OutputPath implements Name<String> { 125 } 126}