001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.context; 020 021import net.jcip.annotations.GuardedBy; 022import net.jcip.annotations.ThreadSafe; 023import org.apache.reef.annotations.audience.DriverSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.driver.context.FailedContext; 026import org.apache.reef.proto.ReefServiceProtos; 027import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 028import org.apache.reef.util.Optional; 029 030import javax.inject.Inject; 031import java.util.*; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Driver-Side representation of all contexts on an Evaluator. 037 */ 038@ThreadSafe 039@DriverSide 040@Private 041public final class ContextRepresenters { 042 private static final Logger LOG = Logger.getLogger(ContextRepresenters.class.getName()); 043 044 private final EvaluatorMessageDispatcher messageDispatcher; 045 private final ContextFactory contextFactory; 046 047 // Mutable fields 048 @GuardedBy("this") 049 private final List<EvaluatorContext> contextStack = new ArrayList<>(); 050 @GuardedBy("this") 051 private final Set<String> contextIds = new HashSet<>(); 052 053 @Inject 054 private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, 055 final ContextFactory contextFactory) { 056 this.messageDispatcher = messageDispatcher; 057 this.contextFactory = contextFactory; 058 } 059 060 /** 061 * Fetch the context with the given ID. 062 * 063 * @param contextId 064 * @return 065 */ 066 public synchronized EvaluatorContext getContext(final String contextId) { 067 for (final EvaluatorContext context : this.contextStack) { 068 if (context.getId().equals(contextId)) return context; 069 } 070 throw new RuntimeException("Unknown evaluator context " + contextId); 071 } 072 073 /** 074 * Create the failed contexts for a FailedEvaluator event. 075 * 076 * @return 077 */ 078 public synchronized List<FailedContext> getFailedContextsForEvaluatorFailure() { 079 final List<FailedContext> failedContextList = new ArrayList<>(); 080 final List<EvaluatorContext> activeContexts = new ArrayList<>(this.contextStack); 081 Collections.reverse(activeContexts); 082 083 for (final EvaluatorContext context : activeContexts) { 084 failedContextList.add(context.getFailedContextForEvaluatorFailure()); 085 } 086 return failedContextList; 087 } 088 089 /** 090 * Process heartbeats from the contexts on an Evaluator. 091 * 092 * @param contextStatusProto 093 * @param notifyClientOnNewActiveContext 094 */ 095 public synchronized void onContextStatusMessages(final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos, 096 final boolean notifyClientOnNewActiveContext) { 097 for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) { 098 this.onContextStatusMessage(contextStatusProto, notifyClientOnNewActiveContext); 099 } 100 } 101 102 103 /** 104 * Process a heartbeat from a context 105 * 106 * @param contextStatusProto 107 * @param notifyClientOnNewActiveContext 108 */ 109 private synchronized void onContextStatusMessage(final ReefServiceProtos.ContextStatusProto contextStatusProto, 110 final boolean notifyClientOnNewActiveContext) { 111 112 LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatusProto.getContextId()); 113 switch (contextStatusProto.getContextState()) { 114 case READY: 115 this.onContextReady(contextStatusProto, notifyClientOnNewActiveContext); 116 break; 117 case FAIL: 118 this.onContextFailed(contextStatusProto); 119 break; 120 case DONE: 121 this.onContextDone(contextStatusProto); 122 break; 123 default: 124 this.onUnknownContextStatus(contextStatusProto); 125 break; 126 } 127 LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatusProto.getContextId()); 128 129 } 130 131 132 private synchronized void onUnknownContextStatus(final ReefServiceProtos.ContextStatusProto contextStatusProto) { 133 LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatusProto); 134 throw new RuntimeException("Received unexpected context status: " + contextStatusProto.getContextState()); 135 } 136 137 private synchronized void onContextFailed(final ReefServiceProtos.ContextStatusProto contextStatusProto) { 138 assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState()); 139 final String contextID = contextStatusProto.getContextId(); 140 LOG.log(Level.FINE, "Context {0} failed", contextID); 141 // It could have failed right away. 142 if (this.isUnknownContextId(contextID)) { 143 this.onNewContext(contextStatusProto, false); 144 } 145 final EvaluatorContext context = getContext(contextID); 146 this.removeContext(context); 147 this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatusProto)); 148 } 149 150 private synchronized void onContextDone(final ReefServiceProtos.ContextStatusProto contextStatusProto) { 151 assert (ReefServiceProtos.ContextStatusProto.State.DONE == contextStatusProto.getContextState()); 152 final String contextID = contextStatusProto.getContextId(); 153 if (isUnknownContextId(contextID)) { 154 throw new RuntimeException("Received DONE for context " + contextID + " which is unknown."); 155 } else { 156 LOG.log(Level.FINE, "Context {0} is DONE.", contextID); 157 final EvaluatorContext context = getContext(contextID); 158 removeContext(context); 159 160 if (context.isRootContext()) { 161 LOG.log(Level.FINE, "Root context {0} closed. Evaluator closed will trigger final shutdown.", contextID); 162 } else { 163 final EvaluatorContext parentContext = this.getContext(context.getParentId().get()); 164 this.messageDispatcher.onContextClose(context.getClosedContext(parentContext)); 165 } 166 } 167 } 168 169 /** 170 * Process a message with status READY from a context. 171 * 172 * @param contextStatusProto 173 * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new 174 * context. 175 */ 176 private synchronized void onContextReady(final ReefServiceProtos.ContextStatusProto contextStatusProto, 177 final boolean notifyClientOnNewActiveContext) { 178 assert (ReefServiceProtos.ContextStatusProto.State.READY == contextStatusProto.getContextState()); 179 final String contextID = contextStatusProto.getContextId(); 180 // This could be the first message we get from that context 181 if (this.isUnknownContextId(contextID)) { 182 this.onNewContext(contextStatusProto, notifyClientOnNewActiveContext); 183 } 184 185 // Dispatch the messages to the application, if there are any. 186 for (final ReefServiceProtos.ContextStatusProto.ContextMessageProto contextMessageProto : contextStatusProto.getContextMessageList()) { 187 final byte[] theMessage = contextMessageProto.getMessage().toByteArray(); 188 final String sourceID = contextMessageProto.getSourceId(); 189 this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID)); 190 } 191 192 } 193 194 /** 195 * Create and add a new context representer. 196 * 197 * @param contextStatusProto the message to create the context from 198 * @param notifyClientOnNewActiveContext whether or not to fire an event to the user. 199 */ 200 private synchronized void onNewContext(final ReefServiceProtos.ContextStatusProto contextStatusProto, 201 final boolean notifyClientOnNewActiveContext) { 202 final String contextID = contextStatusProto.getContextId(); 203 LOG.log(Level.FINE, "Adding new context {0}.", contextID); 204 205 final Optional<String> parentID = contextStatusProto.hasParentId() ? 206 Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty(); 207 final EvaluatorContext context = contextFactory.newContext(contextID, parentID); 208 this.addContext(context); 209 if (contextStatusProto.getRecovery()) { 210 // when we get a recovered active context, always notify application 211 this.messageDispatcher.OnDriverRestartContextActive(context); 212 } else { 213 if (notifyClientOnNewActiveContext) { 214 this.messageDispatcher.onContextActive(context); 215 } 216 } 217 } 218 219 /** 220 * Add the given context to the data structures. 221 * 222 * @param context 223 */ 224 private synchronized void addContext(final EvaluatorContext context) { 225 this.contextStack.add(context); 226 this.contextIds.add(context.getId()); 227 } 228 229 /** 230 * Remove the given context from the data structures. 231 * 232 * @param context 233 */ 234 private synchronized void removeContext(final EvaluatorContext context) { 235 this.contextStack.remove(context); 236 this.contextIds.remove(context.getId()); 237 } 238 239 /** 240 * @param contextId 241 * @return true if the given context id is unknown so far. 242 */ 243 private synchronized boolean isUnknownContextId(final String contextId) { 244 return !this.contextIds.contains(contextId); 245 } 246 247}