001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.tests.watcher; 020 021import org.apache.reef.io.watcher.EventStream; 022import org.apache.reef.io.watcher.EventType; 023 024import javax.inject.Inject; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicInteger; 028 029/** 030 * Count events in the stream. 031 */ 032public final class TestEventStream implements EventStream { 033 034 private Map<EventType, AtomicInteger> eventCounter; 035 036 @Inject 037 private TestEventStream(){ 038 this.eventCounter = new HashMap<>(); 039 for (final EventType type : EventType.values()) { 040 eventCounter.put(type, new AtomicInteger()); 041 } 042 } 043 044 @Override 045 public void onEvent(final EventType type, final String jsonEncodedEvent) { 046 eventCounter.get(type).incrementAndGet(); 047 } 048 049 private void checkEqualTo(final EventType type, final int expectedNum) { 050 final int actualNum = eventCounter.get(type).get(); 051 if (actualNum != expectedNum) { 052 throw new RuntimeException("The expected number of " + type + " is " 053 + expectedNum + " but " + actualNum + " times occurred"); 054 } 055 } 056 057 private void checkGreaterThan(final EventType type, final int num) { 058 final int actualNum = eventCounter.get(type).get(); 059 if (actualNum < num) { 060 throw new RuntimeException("The number of event " + type + " should be greater than " + num 061 + " but " + actualNum + " times occurred"); 062 } 063 } 064 065 /** 066 * This validation is called in WatcherTestDriver#RuntimeStopHandler, so RuntimeStop should not be guaranteed 067 * to be called before this. 068 */ 069 public void validate() { 070 checkEqualTo(EventType.RuntimeStart, 1); 071 checkEqualTo(EventType.StartTime, 1); 072 checkEqualTo(EventType.AllocatedEvaluator, 2); 073 checkEqualTo(EventType.FailedEvaluator, 1); 074 checkEqualTo(EventType.ActiveContext, 2); 075 checkEqualTo(EventType.FailedContext, 1); 076 checkEqualTo(EventType.FailedTask, 1); 077 checkEqualTo(EventType.RunningTask, 2); 078 checkEqualTo(EventType.SuspendedTask, 1); 079 checkGreaterThan(EventType.TaskMessage, 0); 080 checkEqualTo(EventType.CompletedTask, 1); 081 checkEqualTo(EventType.ClosedContext, 1); 082 checkEqualTo(EventType.CompletedEvaluator, 1); 083 checkEqualTo(EventType.StopTime, 1); 084 085 } 086}