001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.watcher; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ClosedContext; 024import org.apache.reef.driver.context.FailedContext; 025import org.apache.reef.driver.evaluator.AllocatedEvaluator; 026import org.apache.reef.driver.evaluator.CompletedEvaluator; 027import org.apache.reef.driver.evaluator.FailedEvaluator; 028import org.apache.reef.driver.task.*; 029import org.apache.reef.io.watcher.param.EventStreams; 030import org.apache.reef.io.watcher.util.WatcherAvroUtil; 031import org.apache.reef.tang.annotations.Parameter; 032import org.apache.reef.tang.annotations.Unit; 033import org.apache.reef.wake.EventHandler; 034import org.apache.reef.wake.time.event.StartTime; 035import org.apache.reef.wake.time.event.StopTime; 036import org.apache.reef.wake.time.runtime.event.RuntimeStart; 037import org.apache.reef.wake.time.runtime.event.RuntimeStop; 038 039import javax.inject.Inject; 040import java.util.Set; 041 042/** 043 * Subscribe events and transfer them as wrapping with corresponding avro classes. 044 */ 045@Unstable 046@Unit 047public final class Watcher { 048 049 private final Set<EventStream> eventStreamSet; 050 051 @Inject 052 private Watcher(@Parameter(EventStreams.class) final Set<EventStream> eventStreamSet) { 053 this.eventStreamSet = eventStreamSet; 054 } 055 056 private void onEvent(final EventType eventType, final String jsonEncodedEvent) { 057 for (final EventStream eventStream : eventStreamSet) { 058 eventStream.onEvent(eventType, jsonEncodedEvent); 059 } 060 } 061 062 public final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> { 063 064 @Override 065 public void onNext(final RuntimeStart runtimeStart) { 066 onEvent(EventType.RuntimeStart, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStart(runtimeStart))); 067 } 068 } 069 070 public final class DriverStartHandler implements EventHandler<StartTime> { 071 072 @Override 073 public void onNext(final StartTime startTime) { 074 onEvent(EventType.StartTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStartTime(startTime))); 075 } 076 } 077 078 public final class DriverStopHandler implements EventHandler<StopTime> { 079 080 @Override 081 public void onNext(final StopTime stopTime) { 082 onEvent(EventType.StopTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStopTime(stopTime))); 083 } 084 } 085 086 public final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> { 087 088 @Override 089 public void onNext(final RuntimeStop runtimeStop) { 090 onEvent(EventType.RuntimeStop, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStop(runtimeStop))); 091 } 092 } 093 094 public final class ContextActiveHandler implements EventHandler<ActiveContext> { 095 096 @Override 097 public void onNext(final ActiveContext activeContext) { 098 onEvent(EventType.ActiveContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroActiveContext(activeContext))); 099 } 100 } 101 102 public final class ContextClosedHandler implements EventHandler<ClosedContext> { 103 104 @Override 105 public void onNext(final ClosedContext closedContext) { 106 onEvent(EventType.ClosedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroClosedContext(closedContext))); 107 } 108 } 109 110 public final class ContextFailedHandler implements EventHandler<FailedContext> { 111 112 @Override 113 public void onNext(final FailedContext failedContext) { 114 onEvent(EventType.FailedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedContext(failedContext))); 115 } 116 } 117 118 public final class TaskCompletedHandler implements EventHandler<CompletedTask> { 119 120 @Override 121 public void onNext(final CompletedTask completedTask) { 122 onEvent(EventType.CompletedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedTask(completedTask))); 123 } 124 } 125 126 public final class TaskFailedHandler implements EventHandler<FailedTask> { 127 128 @Override 129 public void onNext(final FailedTask failedTask) { 130 onEvent(EventType.FailedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedTask(failedTask))); 131 } 132 } 133 134 public final class TaskRunningHandler implements EventHandler<RunningTask> { 135 136 @Override 137 public void onNext(final RunningTask runningTask) { 138 onEvent(EventType.RunningTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRunningTask(runningTask))); 139 } 140 } 141 142 public final class TaskMessageHandler implements EventHandler<TaskMessage> { 143 144 @Override 145 public void onNext(final TaskMessage taskMessage) { 146 onEvent(EventType.TaskMessage, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroTaskMessage(taskMessage))); 147 } 148 } 149 150 public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> { 151 152 @Override 153 public void onNext(final SuspendedTask suspendedTask) { 154 onEvent(EventType.SuspendedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroSuspendedTask(suspendedTask))); 155 } 156 } 157 158 public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 159 160 @Override 161 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 162 onEvent(EventType.AllocatedEvaluator, 163 WatcherAvroUtil.toString(WatcherAvroUtil.toAvroAllocatedEvaluator(allocatedEvaluator))); 164 } 165 } 166 167 public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 168 169 @Override 170 public void onNext(final FailedEvaluator failedEvaluator) { 171 onEvent(EventType.FailedEvaluator, 172 WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedEvaluator(failedEvaluator))); 173 } 174 } 175 176 public final class EvaluatorCompletedHandler implements EventHandler<CompletedEvaluator> { 177 178 @Override 179 public void onNext(final CompletedEvaluator completedEvaluator) { 180 onEvent(EventType.CompletedEvaluator, 181 WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedEvaluator(completedEvaluator))); 182 } 183 } 184}