001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver; 020 021import org.apache.reef.annotations.audience.Private; 022import org.apache.reef.runtime.common.driver.api.RuntimeParameters; 023import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; 024import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; 025import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; 026import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.wake.EventHandler; 029 030import javax.inject.Inject; 031 032/** 033 * Helper that represents the REEF layer to the YARN runtime. 034 */ 035// This is a great place to add a thread boundary, should that need arise. 036@Private 037public final class REEFEventHandlers implements AutoCloseable { 038 private final EventHandler<ResourceAllocationEvent> resourceAllocationHandler; 039 private final EventHandler<ResourceStatusEvent> resourceStatusHandler; 040 private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler; 041 private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler; 042 043 @Inject 044 REEFEventHandlers(@Parameter(RuntimeParameters.NodeDescriptorHandler.class) 045 final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler, 046 @Parameter(RuntimeParameters.RuntimeStatusHandler.class) 047 final EventHandler<RuntimeStatusEvent> runtimeStatusProtoEventHandler, 048 @Parameter(RuntimeParameters.ResourceAllocationHandler.class) 049 final EventHandler<ResourceAllocationEvent> resourceAllocationHandler, 050 @Parameter(RuntimeParameters.ResourceStatusHandler.class) 051 final EventHandler<ResourceStatusEvent> resourceStatusHandler) { 052 this.resourceAllocationHandler = resourceAllocationHandler; 053 this.resourceStatusHandler = resourceStatusHandler; 054 this.runtimeStatusHandler = runtimeStatusProtoEventHandler; 055 this.nodeDescriptorEventHandler = nodeDescriptorEventHandler; 056 } 057 058 /** 059 * Inform reef of a node. 060 * 061 * @param nodeDescriptorProto 062 */ 063 void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorProto) { 064 this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto); 065 } 066 067 /** 068 * Update REEF's view on the runtime status. 069 * 070 * @param runtimeStatusEvent 071 */ 072 @Private 073 public void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) { 074 this.runtimeStatusHandler.onNext(runtimeStatusEvent); 075 } 076 077 /** 078 * Inform REEF of a fresh resource allocation. 079 * 080 * @param resourceAllocationEvent 081 */ 082 @Private 083 public void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) { 084 this.resourceAllocationHandler.onNext(resourceAllocationEvent); 085 } 086 087 /** 088 * Update REEF on a change to the status of a resource. 089 * 090 * @param resourceStatusEvent 091 */ 092 void onResourceStatus(final ResourceStatusEvent resourceStatusEvent) { 093 this.resourceStatusHandler.onNext(resourceStatusEvent); 094 } 095 096 @Override 097 public void close() throws Exception { 098 // Empty, but here for a future where we need to close a threadpool 099 } 100}