001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.vortex.common; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.io.serialization.Codec; 024import org.apache.reef.vortex.api.VortexFunction; 025 026/** 027 * Request to execute a tasklet. 028 */ 029@Unstable 030@Private 031public final class TaskletExecutionRequest<TInput, TOutput> implements VortexRequest { 032 private final int taskletId; 033 private final VortexFunction<TInput, TOutput> userFunction; 034 private final TInput input; 035 036 /** 037 * @return the type of this VortexRequest. 038 */ 039 @Override 040 public RequestType getType() { 041 return RequestType.ExecuteTasklet; 042 } 043 044 /** 045 * Request from Vortex Master to Vortex Worker to execute a tasklet. 046 */ 047 public TaskletExecutionRequest(final int taskletId, 048 final VortexFunction<TInput, TOutput> userFunction, 049 final TInput input) { 050 this.taskletId = taskletId; 051 this.userFunction = userFunction; 052 this.input = input; 053 } 054 055 /** 056 * Execute the function using the input. 057 * @return Output of the function in a serialized form. 058 */ 059 public byte[] execute() throws Exception { 060 final TOutput output = userFunction.call(input); 061 final Codec<TOutput> codec = userFunction.getOutputCodec(); 062 // TODO[REEF-1113]: Handle serialization failure separately in Vortex 063 return codec.encode(output); 064 } 065 066 /** 067 * @return the ID of the VortexTasklet associated with this VortexRequest. 068 */ 069 public int getTaskletId() { 070 return taskletId; 071 } 072 073 /** 074 * Get function of the tasklet. 075 */ 076 public VortexFunction getFunction() { 077 return userFunction; 078 } 079 080 /** 081 * Get input of the tasklet. 082 */ 083 public TInput getInput() { 084 return input; 085 } 086}