001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.webserver; 020 021import org.apache.reef.driver.catalog.NodeDescriptor; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 025import org.apache.reef.driver.restart.DriverRestarted; 026import org.apache.reef.driver.task.RunningTask; 027import org.apache.reef.runtime.common.driver.DriverStatusManager; 028import org.apache.reef.runtime.common.utils.RemoteManager; 029import org.apache.reef.tang.annotations.Unit; 030import org.apache.reef.wake.EventHandler; 031import org.apache.reef.wake.time.event.StartTime; 032import org.apache.reef.wake.time.event.StopTime; 033 034import javax.inject.Inject; 035import java.text.Format; 036import java.text.SimpleDateFormat; 037import java.util.*; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * Reef Event Manager that manages Reef states. 043 */ 044@Unit 045public final class ReefEventStateManager { 046 /** 047 * Standard Java logger. 048 */ 049 private static final Logger LOG = Logger.getLogger(ReefEventStateManager.class.getName()); 050 051 /** 052 * Date format. 053 */ 054 private static final Format FORMAT = new SimpleDateFormat("yyyy MM dd HH:mm:ss"); 055 056 /** 057 * Map of evaluators. 058 */ 059 private final Map<String, EvaluatorDescriptor> evaluators = new HashMap<>(); 060 061 /** 062 * Map from context ID to running evaluator context. 063 */ 064 private final Map<String, ActiveContext> contexts = new HashMap<>(); 065 066 private final List<AvroReefServiceInfo> serviceInfoList = new ArrayList<>(); 067 068 /** 069 * Remote manager in driver the carries information such as driver endpoint identifier. 070 */ 071 private final RemoteManager remoteManager; 072 073 /** 074 * Driver Status Manager that controls the driver status. 075 */ 076 private final DriverStatusManager driverStatusManager; 077 078 /** 079 * Evaluator start time. 080 */ 081 private StartTime startTime; 082 083 /** 084 * Evaluator stop time. 085 */ 086 private StopTime stopTime; 087 088 /** 089 * ReefEventStateManager that keeps the states of Reef components. 090 */ 091 @Inject 092 public ReefEventStateManager(final RemoteManager remoteManager, final DriverStatusManager driverStatusManager) { 093 this.remoteManager = remoteManager; 094 this.driverStatusManager = driverStatusManager; 095 } 096 097 /** 098 * get start time. 099 * 100 * @return 101 */ 102 public String getStartTime() { 103 if (startTime != null) { 104 return convertTime(startTime.getTimeStamp()); 105 } 106 return null; 107 } 108 109 /** 110 * get stop time. 111 * 112 * @return 113 */ 114 public String getStopTime() { 115 if (stopTime != null) { 116 return convertTime(stopTime.getTimeStamp()); 117 } 118 return null; 119 } 120 121 /** 122 * convert time from long to formatted string. 123 * 124 * @param time 125 * @return 126 */ 127 private String convertTime(final long time) { 128 final Date date = new Date(time); 129 return FORMAT.format(date); 130 } 131 132 /** 133 * get evaluator map. 134 * 135 * @return 136 */ 137 public Map<String, EvaluatorDescriptor> getEvaluators() { 138 return evaluators; 139 } 140 141 /** 142 * get driver endpoint identifier. 143 */ 144 public String getDriverEndpointIdentifier() { 145 return remoteManager.getMyIdentifier(); 146 } 147 148 public List<AvroReefServiceInfo> getServicesInfo() { 149 return this.serviceInfoList; 150 } 151 152 public void registerServiceInfo(final AvroReefServiceInfo serviceInfo) { 153 synchronized (this.serviceInfoList) { 154 serviceInfoList.add(serviceInfo); 155 LOG.log(Level.INFO, "Registered Service [{0}] with Info [{1}]", 156 new Object[]{serviceInfo.getServiceName(), serviceInfo.getServiceInfo()}); 157 } 158 } 159 160 /** 161 * get a map of contexts. 162 * 163 * @return 164 */ 165 public Map<String, ActiveContext> getContexts() { 166 return contexts; 167 } 168 169 /** 170 * pus a entry to evaluators. 171 * 172 * @param key 173 * @param value 174 */ 175 public void put(final String key, final EvaluatorDescriptor value) { 176 evaluators.put(key, value); 177 } 178 179 /** 180 * get a value from evaluators by key. 181 * 182 * @param key 183 * @return 184 */ 185 public EvaluatorDescriptor get(final String key) { 186 return evaluators.get(key); 187 } 188 189 /** 190 * getEvaluatorDescriptor. 191 * 192 * @param evaluatorId 193 * @return 194 */ 195 public EvaluatorDescriptor getEvaluatorDescriptor(final String evaluatorId) { 196 return evaluators.get(evaluatorId); 197 } 198 199 /** 200 * get Evaluator NodeDescriptor. 201 * 202 * @param evaluatorId 203 * @return 204 */ 205 public NodeDescriptor getEvaluatorNodeDescriptor(final String evaluatorId) { 206 return evaluators.get(evaluatorId).getNodeDescriptor(); 207 } 208 209 /** 210 * Kill driver by calling onComplete() . This method is called when client wants to kill the driver and evaluators. 211 */ 212 public void onClientKill() { 213 driverStatusManager.onComplete(); 214 } 215 216 /** 217 * Job Driver is ready and the clock is set up. 218 */ 219 public final class StartStateHandler implements EventHandler<StartTime> { 220 @Override 221 @SuppressWarnings("checkstyle:hiddenfield") 222 public void onNext(final StartTime startTime) { 223 LOG.log(Level.INFO, 224 "StartStateHandler: Driver started with endpoint identifier [{0}] and StartTime [{1}]", 225 new Object[]{ReefEventStateManager.this.remoteManager.getMyIdentifier(), startTime}); 226 ReefEventStateManager.this.startTime = startTime; 227 } 228 } 229 230 /** 231 * Job Driver has been restarted. 232 */ 233 public final class DriverRestartHandler implements EventHandler<DriverRestarted> { 234 @Override 235 @SuppressWarnings("checkstyle:hiddenfield") 236 public void onNext(final DriverRestarted restartTime) { 237 LOG.log(Level.INFO, "DriverRestartHandler called. StartTime: {0}", restartTime); 238 } 239 } 240 241 /** 242 * Job driver stopped, log the stop time. 243 */ 244 public final class StopStateHandler implements EventHandler<StopTime> { 245 @Override 246 @SuppressWarnings("checkstyle:hiddenfield") 247 public void onNext(final StopTime stopTime) { 248 LOG.log(Level.INFO, "StopStateHandler called. StopTime: {0}", stopTime); 249 ReefEventStateManager.this.stopTime = stopTime; 250 } 251 } 252 253 /** 254 * Receive notification that an Evaluator had been allocated. 255 */ 256 public final class AllocatedEvaluatorStateHandler implements EventHandler<AllocatedEvaluator> { 257 @Override 258 public void onNext(final AllocatedEvaluator eval) { 259 synchronized (ReefEventStateManager.this) { 260 ReefEventStateManager.this.put(eval.getId(), eval.getEvaluatorDescriptor()); 261 } 262 } 263 } 264 265 /** 266 * Receive event when task is running. 267 */ 268 public final class TaskRunningStateHandler implements EventHandler<RunningTask> { 269 @Override 270 public void onNext(final RunningTask runningTask) { 271 LOG.log(Level.INFO, "Running task {0} received.", runningTask.getId()); 272 } 273 } 274 275 /** 276 * Receive event during driver restart that a task is running in previous evaluator. 277 */ 278 public final class DriverRestartTaskRunningStateHandler implements EventHandler<RunningTask> { 279 @Override 280 public void onNext(final RunningTask runningTask) { 281 LOG.log(Level.INFO, "Running task {0} received during driver restart.", runningTask.getId()); 282 } 283 } 284 285 /** 286 * Receive notification that a new Context is available. 287 */ 288 public final class ActiveContextStateHandler implements EventHandler<ActiveContext> { 289 @Override 290 public void onNext(final ActiveContext context) { 291 synchronized (ReefEventStateManager.this) { 292 LOG.log(Level.INFO, "Active Context {0} received and handled in state handler", context); 293 contexts.put(context.getId(), context); 294 } 295 } 296 } 297 298 /** 299 * Receive notification that a new Context is available. 300 */ 301 public final class DriverRestartActiveContextStateHandler implements EventHandler<ActiveContext> { 302 @Override 303 public void onNext(final ActiveContext context) { 304 synchronized (ReefEventStateManager.this) { 305 LOG.log(Level.INFO, "Active Context {0} received and handled in state handler during driver restart.", context); 306 evaluators.put(context.getEvaluatorId(), context.getEvaluatorDescriptor()); 307 contexts.put(context.getId(), context); 308 } 309 } 310 } 311 312 /** 313 * Receive notification from the client. 314 */ 315 public final class ClientMessageStateHandler implements EventHandler<byte[]> { 316 @Override 317 public void onNext(final byte[] message) { 318 synchronized (ReefEventStateManager.this) { 319 LOG.log(Level.INFO, "ClientMessageStateHandler OnNext called"); 320 } 321 } 322 } 323}