This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.watcher;
020
021import org.apache.reef.annotations.Unstable;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ClosedContext;
024import org.apache.reef.driver.context.FailedContext;
025import org.apache.reef.driver.evaluator.AllocatedEvaluator;
026import org.apache.reef.driver.evaluator.CompletedEvaluator;
027import org.apache.reef.driver.evaluator.FailedEvaluator;
028import org.apache.reef.driver.task.*;
029import org.apache.reef.io.watcher.param.EventStreams;
030import org.apache.reef.io.watcher.util.WatcherAvroUtil;
031import org.apache.reef.tang.annotations.Parameter;
032import org.apache.reef.tang.annotations.Unit;
033import org.apache.reef.wake.EventHandler;
034import org.apache.reef.wake.time.event.StartTime;
035import org.apache.reef.wake.time.event.StopTime;
036import org.apache.reef.wake.time.runtime.event.RuntimeStart;
037import org.apache.reef.wake.time.runtime.event.RuntimeStop;
038
039import javax.inject.Inject;
040import java.util.Set;
041
042/**
043 * Subscribe events and transfer them as wrapping with corresponding avro classes.
044 */
045@Unstable
046@Unit
047public final class Watcher {
048
049  private final Set<EventStream> eventStreamSet;
050
051  @Inject
052  private Watcher(@Parameter(EventStreams.class) final Set<EventStream> eventStreamSet) {
053    this.eventStreamSet = eventStreamSet;
054  }
055
056  private void onEvent(final EventType eventType, final String jsonEncodedEvent) {
057    for (final EventStream eventStream : eventStreamSet) {
058      eventStream.onEvent(eventType, jsonEncodedEvent);
059    }
060  }
061
062  public final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
063
064    @Override
065    public void onNext(final RuntimeStart runtimeStart) {
066      onEvent(EventType.RuntimeStart, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStart(runtimeStart)));
067    }
068  }
069
070  public final class DriverStartHandler implements EventHandler<StartTime> {
071
072    @Override
073    public void onNext(final StartTime startTime) {
074      onEvent(EventType.StartTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStartTime(startTime)));
075    }
076  }
077
078  public final class DriverStopHandler implements EventHandler<StopTime> {
079
080    @Override
081    public void onNext(final StopTime stopTime) {
082      onEvent(EventType.StopTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStopTime(stopTime)));
083    }
084  }
085
086  public final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
087
088    @Override
089    public void onNext(final RuntimeStop runtimeStop) {
090      onEvent(EventType.RuntimeStop, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStop(runtimeStop)));
091    }
092  }
093
094  public final class ContextActiveHandler implements EventHandler<ActiveContext> {
095
096    @Override
097    public void onNext(final ActiveContext activeContext) {
098      onEvent(EventType.ActiveContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroActiveContext(activeContext)));
099    }
100  }
101
102  public final class ContextClosedHandler implements EventHandler<ClosedContext> {
103
104    @Override
105    public void onNext(final ClosedContext closedContext) {
106      onEvent(EventType.ClosedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroClosedContext(closedContext)));
107    }
108  }
109
110  public final class ContextFailedHandler implements EventHandler<FailedContext> {
111
112    @Override
113    public void onNext(final FailedContext failedContext) {
114      onEvent(EventType.FailedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedContext(failedContext)));
115    }
116  }
117
118  public final class TaskCompletedHandler implements EventHandler<CompletedTask> {
119
120    @Override
121    public void onNext(final CompletedTask completedTask) {
122      onEvent(EventType.CompletedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedTask(completedTask)));
123    }
124  }
125
126  public final class TaskFailedHandler implements EventHandler<FailedTask> {
127
128    @Override
129    public void onNext(final FailedTask failedTask) {
130      onEvent(EventType.FailedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedTask(failedTask)));
131    }
132  }
133
134  public final class TaskRunningHandler implements EventHandler<RunningTask> {
135
136    @Override
137    public void onNext(final RunningTask runningTask) {
138      onEvent(EventType.RunningTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRunningTask(runningTask)));
139    }
140  }
141
142  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
143
144    @Override
145    public void onNext(final TaskMessage taskMessage) {
146      onEvent(EventType.TaskMessage, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroTaskMessage(taskMessage)));
147    }
148  }
149
150  public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> {
151
152    @Override
153    public void onNext(final SuspendedTask suspendedTask) {
154      onEvent(EventType.SuspendedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroSuspendedTask(suspendedTask)));
155    }
156  }
157
158  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
159
160    @Override
161    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
162      onEvent(EventType.AllocatedEvaluator,
163          WatcherAvroUtil.toString(WatcherAvroUtil.toAvroAllocatedEvaluator(allocatedEvaluator)));
164    }
165  }
166
167  public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
168
169    @Override
170    public void onNext(final FailedEvaluator failedEvaluator) {
171      onEvent(EventType.FailedEvaluator,
172          WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedEvaluator(failedEvaluator)));
173    }
174  }
175
176  public final class EvaluatorCompletedHandler implements EventHandler<CompletedEvaluator> {
177
178    @Override
179    public void onNext(final CompletedEvaluator completedEvaluator) {
180      onEvent(EventType.CompletedEvaluator,
181          WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedEvaluator(completedEvaluator)));
182    }
183  }
184}