001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.task; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.context.ActiveContext; 025import org.apache.reef.driver.task.RunningTask; 026import org.apache.reef.proto.EvaluatorRuntimeProtocol.ContextControlProto; 027import org.apache.reef.proto.EvaluatorRuntimeProtocol.StopTaskProto; 028import org.apache.reef.proto.EvaluatorRuntimeProtocol.SuspendTaskProto; 029import org.apache.reef.runtime.common.driver.context.EvaluatorContext; 030import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; 031 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Implements the RunningTask client interface. It is mainly a helper class 037 * that will package up various client method calls into protocol buffers and 038 * pass them to its respective EvaluatorManager to deliver to the EvaluatorRuntime. 039 */ 040@Private 041@DriverSide 042public final class RunningTaskImpl implements RunningTask { 043 044 private static final Logger LOG = Logger.getLogger(RunningTask.class.getName()); 045 046 private final EvaluatorManager evaluatorManager; 047 private final EvaluatorContext evaluatorContext; 048 private final String taskId; 049 private final TaskRepresenter taskRepresenter; 050 051 public RunningTaskImpl(final EvaluatorManager evaluatorManager, 052 final String taskId, 053 final EvaluatorContext evaluatorContext, 054 final TaskRepresenter taskRepresenter) { 055 LOG.log(Level.FINEST, "INIT: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]"); 056 057 this.evaluatorManager = evaluatorManager; 058 this.evaluatorContext = evaluatorContext; 059 this.taskId = taskId; 060 this.taskRepresenter = taskRepresenter; 061 } 062 063 064 @Override 065 public ActiveContext getActiveContext() { 066 return this.evaluatorContext; 067 } 068 069 @Override 070 public String getId() { 071 return this.taskId; 072 } 073 074 @Override 075 public void send(final byte[] message) { 076 LOG.log(Level.FINEST, "MESSAGE: Task id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]"); 077 078 final ContextControlProto contextControlProto = ContextControlProto.newBuilder() 079 .setTaskMessage(ByteString.copyFrom(message)) 080 .build(); 081 082 this.evaluatorManager.sendContextControlMessage(contextControlProto); 083 } 084 085 @Override 086 public void close() { 087 LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]"); 088 089 if (this.taskRepresenter.isClosable()) { 090 final ContextControlProto contextControlProto = ContextControlProto.newBuilder() 091 .setStopTask(StopTaskProto.newBuilder().build()) 092 .build(); 093 this.evaluatorManager.sendContextControlMessage(contextControlProto); 094 } else { 095 LOG.log(Level.INFO, "Ignoring call to .close() because the task is no longer RUNNING."); 096 } 097 } 098 099 @Override 100 public void close(final byte[] message) { 101 LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + 102 "] with message."); 103 if (this.taskRepresenter.isClosable()) { 104 final ContextControlProto contextControlProto = ContextControlProto.newBuilder() 105 .setStopTask(StopTaskProto.newBuilder().build()) 106 .setTaskMessage(ByteString.copyFrom(message)) 107 .build(); 108 this.evaluatorManager.sendContextControlMessage(contextControlProto); 109 } else { 110 LOG.log(Level.INFO, "Ignoring call to .close(byte[] message) because the task is no longer RUNNING " 111 + "(see REEF-1503 for an example of scenario in which this happens)."); 112 } 113 } 114 115 @Override 116 public void suspend(final byte[] message) { 117 LOG.log(Level.FINEST, "SUSPEND: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + 118 "] with message."); 119 120 final ContextControlProto contextControlProto = ContextControlProto.newBuilder() 121 .setSuspendTask(SuspendTaskProto.newBuilder().build()) 122 .setTaskMessage(ByteString.copyFrom(message)) 123 .build(); 124 this.evaluatorManager.sendContextControlMessage(contextControlProto); 125 } 126 127 @Override 128 public void suspend() { 129 LOG.log(Level.FINEST, "SUSPEND: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]"); 130 131 final ContextControlProto contextControlProto = ContextControlProto.newBuilder() 132 .setSuspendTask(SuspendTaskProto.newBuilder().build()) 133 .build(); 134 this.evaluatorManager.sendContextControlMessage(contextControlProto); 135 } 136 137 @Override 138 public String toString() { 139 return "RunningTask{taskId='" + taskId + "'}"; 140 } 141 142 public TaskRepresenter getTaskRepresenter() { 143 return taskRepresenter; 144 } 145}