001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.webserver; 020 021import org.apache.reef.driver.catalog.NodeDescriptor; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 025import org.apache.reef.driver.task.RunningTask; 026import org.apache.reef.runtime.common.driver.DriverStatusManager; 027import org.apache.reef.runtime.common.utils.RemoteManager; 028import org.apache.reef.tang.annotations.Unit; 029import org.apache.reef.wake.EventHandler; 030import org.apache.reef.wake.time.event.StartTime; 031import org.apache.reef.wake.time.event.StopTime; 032 033import javax.inject.Inject; 034import java.text.Format; 035import java.text.SimpleDateFormat; 036import java.util.*; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Reef Event Manager that manages Reef states 042 */ 043@Unit 044public final class ReefEventStateManager { 045 /** 046 * Standard Java logger. 047 */ 048 private static final Logger LOG = Logger.getLogger(ReefEventStateManager.class.getName()); 049 050 /** 051 * date format 052 */ 053 private static final Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss"); 054 055 /** 056 * Map of evaluators 057 */ 058 private final Map<String, EvaluatorDescriptor> evaluators = new HashMap<>(); 059 060 /** 061 * Map from context ID to running evaluator context. 062 */ 063 private final Map<String, ActiveContext> contexts = new HashMap<>(); 064 065 private final List<AvroReefServiceInfo> serviceInfoList = new ArrayList<AvroReefServiceInfo>(); 066 067 /** 068 * Remote manager in driver the carries information such as driver endpoint identifier 069 */ 070 private final RemoteManager remoteManager; 071 072 /** 073 * Driver Status Manager that controls the driver status 074 */ 075 private final DriverStatusManager driverStatusManager; 076 077 /** 078 * Evaluator start time 079 */ 080 private StartTime startTime; 081 082 /** 083 * Evaluator stop time 084 */ 085 private StopTime stopTime; 086 087 /** 088 * ReefEventStateManager that keeps the states of Reef components 089 */ 090 @Inject 091 public ReefEventStateManager(final RemoteManager remoteManager, final DriverStatusManager driverStatusManager) { 092 this.remoteManager = remoteManager; 093 this.driverStatusManager = driverStatusManager; 094 } 095 096 /** 097 * get start time 098 * 099 * @return 100 */ 101 public String getStartTime() { 102 if (startTime != null) { 103 return convertTime(startTime.getTimeStamp()); 104 } 105 return null; 106 } 107 108 /** 109 * get stop time 110 * 111 * @return 112 */ 113 public String getStopTime() { 114 if (stopTime != null) { 115 return convertTime(stopTime.getTimeStamp()); 116 } 117 return null; 118 } 119 120 /** 121 * convert time from long to formatted string 122 * 123 * @param time 124 * @return 125 */ 126 private String convertTime(final long time) { 127 final Date date = new Date(time); 128 return format.format(date).toString(); 129 } 130 131 /** 132 * get evaluator map 133 * 134 * @return 135 */ 136 public Map<String, EvaluatorDescriptor> getEvaluators() { 137 return evaluators; 138 } 139 140 /** 141 * get driver endpoint identifier 142 */ 143 public String getDriverEndpointIdentifier() { 144 return remoteManager.getMyIdentifier(); 145 } 146 147 public List<AvroReefServiceInfo> getServicesInfo() { 148 return this.serviceInfoList; 149 } 150 151 public void registerServiceInfo(AvroReefServiceInfo serviceInfo) { 152 synchronized (this.serviceInfoList) { 153 serviceInfoList.add(serviceInfo); 154 LOG.log(Level.INFO, "Registered Service [{0}] with Info [{1}]", new Object[]{serviceInfo.getServiceName(), serviceInfo.getServiceInfo()}); 155 } 156 } 157 158 /** 159 * get a map of contexts 160 * 161 * @return 162 */ 163 public Map<String, ActiveContext> getContexts() { 164 return contexts; 165 } 166 167 /** 168 * pus a entry to evaluators 169 * 170 * @param key 171 * @param value 172 */ 173 public void put(final String key, final EvaluatorDescriptor value) { 174 evaluators.put(key, value); 175 } 176 177 /** 178 * get a value from evaluators by key 179 * 180 * @param key 181 * @return 182 */ 183 public EvaluatorDescriptor get(final String key) { 184 return evaluators.get(key); 185 } 186 187 /** 188 * getEvaluatorDescriptor 189 * 190 * @param evaluatorId 191 * @return 192 */ 193 public EvaluatorDescriptor getEvaluatorDescriptor(final String evaluatorId) { 194 return evaluators.get(evaluatorId); 195 } 196 197 /** 198 * get Evaluator NodeDescriptor 199 * 200 * @param evaluatorId 201 * @return 202 */ 203 public NodeDescriptor getEvaluatorNodeDescriptor(final String evaluatorId) { 204 return evaluators.get(evaluatorId).getNodeDescriptor(); 205 } 206 207 /** 208 * Kill driver by calling onComplete() . This method is called when client wants to kill the driver and evaluators. 209 */ 210 public void OnClientKill() { 211 driverStatusManager.onComplete(); 212 } 213 214 /** 215 * Job Driver is ready and the clock is set up 216 */ 217 public final class StartStateHandler implements EventHandler<StartTime> { 218 @Override 219 public void onNext(final StartTime startTime) { 220 LOG.log(Level.INFO, 221 "StartStateHandler: Driver started with endpoint identifier [{0}] and StartTime [{1}]", 222 new Object[]{ReefEventStateManager.this.remoteManager.getMyIdentifier(), startTime}); 223 ReefEventStateManager.this.startTime = startTime; 224 } 225 } 226 227 /** 228 * Job driver stopped, log the stop time. 229 */ 230 public final class StopStateHandler implements EventHandler<StopTime> { 231 @Override 232 public void onNext(final StopTime stopTime) { 233 LOG.log(Level.INFO, "StopStateHandler called. StopTime: {0}", stopTime); 234 ReefEventStateManager.this.stopTime = stopTime; 235 } 236 } 237 238 /** 239 * Receive notification that an Evaluator had been allocated 240 */ 241 public final class AllocatedEvaluatorStateHandler implements EventHandler<AllocatedEvaluator> { 242 @Override 243 public void onNext(final AllocatedEvaluator eval) { 244 synchronized (ReefEventStateManager.this) { 245 ReefEventStateManager.this.put(eval.getId(), eval.getEvaluatorDescriptor()); 246 } 247 } 248 } 249 250 /** 251 * Receive event when task is running 252 */ 253 public final class TaskRunningStateHandler implements EventHandler<RunningTask> { 254 @Override 255 public void onNext(final RunningTask runningTask) { 256 LOG.log(Level.INFO, "Running task {0} received.", runningTask.getId()); 257 } 258 } 259 260 /** 261 * Receive event during driver restart that a task is running in previous evaluator 262 */ 263 public final class DriverRestartTaskRunningStateHandler implements EventHandler<RunningTask> { 264 @Override 265 public void onNext(final RunningTask runningTask) { 266 LOG.log(Level.INFO, "Running task {0} received during driver restart.", runningTask.getId()); 267 } 268 } 269 270 /** 271 * Receive notification that a new Context is available. 272 */ 273 public final class ActiveContextStateHandler implements EventHandler<ActiveContext> { 274 @Override 275 public void onNext(final ActiveContext context) { 276 synchronized (ReefEventStateManager.this) { 277 LOG.log(Level.INFO, "Active Context {0} received and handled in state handler", context); 278 contexts.put(context.getId(), context); 279 } 280 } 281 } 282 283 /** 284 * Receive notification that a new Context is available. 285 */ 286 public final class DrivrRestartActiveContextStateHandler implements EventHandler<ActiveContext> { 287 @Override 288 public void onNext(final ActiveContext context) { 289 synchronized (ReefEventStateManager.this) { 290 LOG.log(Level.INFO, "Active Context {0} received and handled in state handler during driver restart.", context); 291 evaluators.put(context.getEvaluatorId(), context.getEvaluatorDescriptor()); 292 contexts.put(context.getId(), context); 293 } 294 } 295 } 296 297 /** 298 * Receive notification from the client. 299 */ 300 public final class ClientMessageStateHandler implements EventHandler<byte[]> { 301 @Override 302 public void onNext(final byte[] message) { 303 synchronized (ReefEventStateManager.this) { 304 LOG.log(Level.INFO, "ClientMessageStateHandler OnNext called"); 305 } 306 } 307 } 308}