This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.tests.watcher;
020
021import org.apache.reef.tang.annotations.Parameter;
022import org.apache.reef.tang.annotations.Unit;
023import org.apache.reef.task.HeartBeatTriggerManager;
024import org.apache.reef.task.Task;
025import org.apache.reef.task.TaskMessage;
026import org.apache.reef.task.TaskMessageSource;
027import org.apache.reef.task.events.SuspendEvent;
028import org.apache.reef.util.Optional;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.nio.charset.Charset;
033
034/**
035 * Task for WatcherTest.
036 */
037@Unit
038public final class WatcherTestTask implements Task, TaskMessageSource {
039
040  private final TaskMessage taskMessage;
041  private final HeartBeatTriggerManager heartBeatTriggerManager;
042  private final boolean isTaskSuspended;
043  private boolean isRunning;
044
045  @Inject
046  private WatcherTestTask(final HeartBeatTriggerManager heartBeatTriggerManager,
047                          @Parameter(IsTaskSuspended.class) final boolean isTaskSuspended) {
048    this.taskMessage = TaskMessage.from("MESSAGE_SOURCE", "MESSAGE".getBytes(Charset.forName("UTF-8")));
049    this.heartBeatTriggerManager = heartBeatTriggerManager;
050    this.isTaskSuspended = isTaskSuspended;
051    this.isRunning = true;
052  }
053
054  @Override
055  public byte[] call(final byte[] memento) throws Exception {
056    if (isTaskSuspended) {
057      synchronized (this) {
058        while (isRunning) {
059          wait();
060        }
061      }
062    } else {
063      heartBeatTriggerManager.triggerHeartBeat();
064    }
065
066    return null;
067  }
068
069  @Override
070  public Optional<TaskMessage> getMessage() {
071    return Optional.of(taskMessage);
072  }
073
074  /**
075   * Handler for SuspendEvent.
076   */
077  public final class TaskSuspendedHandler implements EventHandler<SuspendEvent> {
078
079    @Override
080    public void onNext(final SuspendEvent value) {
081      synchronized (WatcherTestTask.this) {
082        isRunning = false;
083        WatcherTestTask.this.notify();
084      }
085    }
086  }
087}