001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.context; 020 021import net.jcip.annotations.GuardedBy; 022import net.jcip.annotations.ThreadSafe; 023import org.apache.reef.annotations.audience.DriverSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.driver.restart.DriverRestartManager; 027import org.apache.reef.driver.restart.EvaluatorRestartState; 028import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 029import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextMessagePOJO; 030import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; 032import org.apache.reef.util.Optional; 033 034import javax.inject.Inject; 035import java.util.*; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Driver-Side representation of all contexts on an Evaluator. 041 */ 042@ThreadSafe 043@DriverSide 044@Private 045public final class ContextRepresenters { 046 private static final Logger LOG = Logger.getLogger(ContextRepresenters.class.getName()); 047 048 private final EvaluatorMessageDispatcher messageDispatcher; 049 private final ContextFactory contextFactory; 050 051 // Mutable fields 052 @GuardedBy("this") 053 private final List<EvaluatorContext> contextStack = new ArrayList<>(); 054 @GuardedBy("this") 055 private final Set<String> contextIds = new HashSet<>(); 056 057 private final DriverRestartManager driverRestartManager; 058 059 @Inject 060 private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, 061 final ContextFactory contextFactory, 062 final DriverRestartManager driverRestartManager) { 063 this.messageDispatcher = messageDispatcher; 064 this.contextFactory = contextFactory; 065 this.driverRestartManager = driverRestartManager; 066 } 067 068 /** 069 * Fetch the context with the given ID. 070 * 071 * @param contextId 072 * @return 073 */ 074 public synchronized EvaluatorContext getContext(final String contextId) { 075 for (final EvaluatorContext context : this.contextStack) { 076 if (context.getId().equals(contextId)) { 077 return context; 078 } 079 } 080 throw new RuntimeException("Unknown evaluator context " + contextId); 081 } 082 083 /** 084 * Create the failed contexts for a FailedEvaluator event. 085 * 086 * @return 087 */ 088 public synchronized List<FailedContext> getFailedContextsForEvaluatorFailure() { 089 final List<FailedContext> failedContextList = new ArrayList<>(); 090 final List<EvaluatorContext> activeContexts = new ArrayList<>(this.contextStack); 091 Collections.reverse(activeContexts); 092 093 for (final EvaluatorContext context : activeContexts) { 094 failedContextList.add(context.getFailedContextForEvaluatorFailure()); 095 } 096 return failedContextList; 097 } 098 099 /** 100 * Process heartbeats from the contexts on an Evaluator. 101 * 102 * @param contextStatusPOJOs 103 * @param notifyClientOnNewActiveContext 104 */ 105 public synchronized void onContextStatusMessages(final Iterable<ContextStatusPOJO> 106 contextStatusPOJOs, 107 final boolean notifyClientOnNewActiveContext) { 108 for (final ContextStatusPOJO contextStatus : contextStatusPOJOs) { 109 this.onContextStatusMessage(contextStatus, notifyClientOnNewActiveContext); 110 } 111 } 112 113 114 /** 115 * Process a heartbeat from a context. 116 * 117 * @param contextStatus 118 * @param notifyClientOnNewActiveContext 119 */ 120 private synchronized void onContextStatusMessage(final ContextStatusPOJO contextStatus, 121 final boolean notifyClientOnNewActiveContext) { 122 123 LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatus.getContextId()); 124 switch (contextStatus.getContextState()) { 125 case READY: 126 this.onContextReady(contextStatus, notifyClientOnNewActiveContext); 127 break; 128 case FAIL: 129 this.onContextFailed(contextStatus); 130 break; 131 case DONE: 132 this.onContextDone(contextStatus); 133 break; 134 default: 135 this.onUnknownContextStatus(contextStatus); 136 break; 137 } 138 LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatus.getContextId()); 139 140 } 141 142 143 private synchronized void onUnknownContextStatus(final ContextStatusPOJO contextStatus) { 144 LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatus); 145 throw new RuntimeException("Received unexpected context status: " + contextStatus.getContextState()); 146 } 147 148 private synchronized void onContextFailed(final ContextStatusPOJO contextStatus) { 149 assert ContextState.FAIL == contextStatus.getContextState(); 150 final String contextID = contextStatus.getContextId(); 151 LOG.log(Level.FINE, "Context {0} failed", contextID); 152 // It could have failed right away. 153 if (this.isUnknownContextId(contextID)) { 154 this.onNewContext(contextStatus, false); 155 } 156 final EvaluatorContext context = getContext(contextID); 157 this.removeContext(context); 158 this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatus)); 159 } 160 161 private synchronized void onContextDone(final ContextStatusPOJO contextStatus) { 162 assert ContextState.DONE == contextStatus.getContextState(); 163 final String contextID = contextStatus.getContextId(); 164 if (isUnknownContextId(contextID)) { 165 throw new RuntimeException("Received DONE for context " + contextID + " which is unknown."); 166 } else { 167 LOG.log(Level.FINE, "Context {0} is DONE.", contextID); 168 final EvaluatorContext context = getContext(contextID); 169 removeContext(context); 170 171 if (context.isRootContext()) { 172 LOG.log(Level.FINE, "Root context {0} closed. Evaluator closed will trigger final shutdown.", contextID); 173 } else { 174 final EvaluatorContext parentContext = this.getContext(context.getParentId().get()); 175 this.messageDispatcher.onContextClose(context.getClosedContext(parentContext)); 176 } 177 } 178 } 179 180 /** 181 * Process a message with status READY from a context. 182 * 183 * @param contextStatus 184 * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new 185 * context. 186 */ 187 private synchronized void onContextReady(final ContextStatusPOJO contextStatus, 188 final boolean notifyClientOnNewActiveContext) { 189 assert ContextState.READY == contextStatus.getContextState(); 190 final String contextID = contextStatus.getContextId(); 191 // This could be the first message we get from that context 192 if (this.isUnknownContextId(contextID)) { 193 this.onNewContext(contextStatus, notifyClientOnNewActiveContext); 194 } 195 196 // Dispatch the messages to the application, if there are any. 197 for (final ContextMessagePOJO 198 contextMessage : contextStatus.getContextMessageList()) { 199 final byte[] theMessage = contextMessage.getMessage(); 200 final String sourceID = contextMessage.getSourceId(); 201 final long sequenceNumber = contextMessage.getSequenceNumber(); 202 this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID, sequenceNumber)); 203 } 204 205 } 206 207 /** 208 * Create and add a new context representer. 209 * 210 * @param contextStatus the message to create the context from 211 * @param notifyClientOnNewActiveContext whether or not to fire an event to the user. 212 */ 213 private synchronized void onNewContext(final ContextStatusPOJO contextStatus, 214 final boolean notifyClientOnNewActiveContext) { 215 final String contextID = contextStatus.getContextId(); 216 LOG.log(Level.FINE, "Adding new context {0}.", contextID); 217 218 final Optional<String> parentID = contextStatus.hasParentId() ? 219 Optional.of(contextStatus.getParentId()) : Optional.<String>empty(); 220 final EvaluatorContext context = contextFactory.newContext(contextID, parentID); 221 this.addContext(context); 222 if (notifyClientOnNewActiveContext) { 223 if (driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) 224 == EvaluatorRestartState.REREGISTERED) { 225 // if restart, advance restart state and all the restart context active handlers. 226 driverRestartManager.setEvaluatorProcessed(context.getEvaluatorId()); 227 this.messageDispatcher.onDriverRestartContextActive(context); 228 } 229 230 this.messageDispatcher.onContextActive(context); 231 } 232 } 233 234 /** 235 * Add the given context to the data structures. 236 * 237 * @param context 238 */ 239 private synchronized void addContext(final EvaluatorContext context) { 240 this.contextStack.add(context); 241 this.contextIds.add(context.getId()); 242 } 243 244 /** 245 * Remove the given context from the data structures. 246 * 247 * @param context 248 */ 249 private synchronized void removeContext(final EvaluatorContext context) { 250 this.contextStack.remove(context); 251 this.contextIds.remove(context.getId()); 252 } 253 254 /** 255 * @param contextId 256 * @return true if the given context id is unknown so far. 257 */ 258 private synchronized boolean isUnknownContextId(final String contextId) { 259 return !this.contextIds.contains(contextId); 260 } 261 262}