This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.context;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.context.ActiveContext;
025import org.apache.reef.driver.context.ClosedContext;
026import org.apache.reef.driver.context.FailedContext;
027import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
028import org.apache.reef.proto.EvaluatorRuntimeProtocol;
029import org.apache.reef.proto.ReefServiceProtos;
030import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
031import org.apache.reef.runtime.common.utils.ExceptionCodec;
032import org.apache.reef.tang.Configuration;
033import org.apache.reef.tang.formats.ConfigurationSerializer;
034import org.apache.reef.util.Optional;
035
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Driver-side representation of a Context on an Evaluator.
041 */
042@DriverSide
043@Private
044public final class EvaluatorContext implements ActiveContext {
045
046  private final static Logger LOG = Logger.getLogger(EvaluatorContext.class.getName());
047
048  private final String contextIdentifier;
049  private final String evaluatorIdentifier;
050  private final EvaluatorDescriptor evaluatorDescriptor;
051
052  private final Optional<String> parentID;
053  private final ConfigurationSerializer configurationSerializer;
054  private final ContextControlHandler contextControlHandler;
055  private final ExceptionCodec exceptionCodec;
056  private final ContextRepresenters contextRepresenters;
057
058  private boolean isClosed = false;
059
060  public EvaluatorContext(final String contextIdentifier,
061                          final String evaluatorIdentifier,
062                          final EvaluatorDescriptor evaluatorDescriptor,
063                          final Optional<String> parentID,
064                          final ConfigurationSerializer configurationSerializer,
065                          final ContextControlHandler contextControlHandler,
066                          final EvaluatorMessageDispatcher messageDispatcher,
067                          final ExceptionCodec exceptionCodec,
068                          final ContextRepresenters contextRepresenters) {
069
070    this.contextIdentifier = contextIdentifier;
071    this.evaluatorIdentifier = evaluatorIdentifier;
072    this.evaluatorDescriptor = evaluatorDescriptor;
073    this.parentID = parentID;
074    this.configurationSerializer = configurationSerializer;
075    this.contextControlHandler = contextControlHandler;
076    this.exceptionCodec = exceptionCodec;
077    this.contextRepresenters = contextRepresenters;
078
079    LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'");
080  }
081
082  @Override
083  public synchronized void close() {
084
085    if (this.isClosed) {
086      throw new RuntimeException("Active context already closed");
087    }
088
089    LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]",
090        new Object[]{getEvaluatorId(), getId()});
091
092    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
093        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
094            .setRemoveContext(
095                EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder()
096                    .setContextId(getId())
097                    .build())
098            .build();
099
100    this.contextControlHandler.send(contextControlProto);
101    this.isClosed = true;
102  }
103
104  @Override
105  public synchronized void sendMessage(final byte[] message) {
106
107    if (this.isClosed) {
108      throw new RuntimeException("Active context already closed");
109    }
110
111    LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]",
112        new Object[]{getEvaluatorId(), getId()});
113
114    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
115        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
116            .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder()
117                .setContextId(this.contextIdentifier)
118                .setMessage(ByteString.copyFrom(message))
119                .build())
120            .build();
121
122    this.contextControlHandler.send(contextControlProto);
123  }
124
125  @Override
126  public synchronized void submitTask(final Configuration taskConf) {
127
128    if (this.isClosed) {
129      throw new RuntimeException("Active context already closed");
130    }
131
132    LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]",
133        new Object[]{getEvaluatorId(), getId()});
134
135    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
136        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
137            .setStartTask(
138                EvaluatorRuntimeProtocol.StartTaskProto.newBuilder()
139                    .setContextId(this.contextIdentifier)
140                    .setConfiguration(this.configurationSerializer.toString(taskConf))
141                    .build())
142            .build();
143
144    this.contextControlHandler.send(contextControlProto);
145  }
146
147  @Override
148  public synchronized void submitContext(final Configuration contextConfiguration) {
149
150    if (this.isClosed) {
151      throw new RuntimeException("Active context already closed");
152    }
153
154    LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
155        new Object[]{getEvaluatorId(), getId()});
156
157    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
158        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
159            .setAddContext(
160                EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
161                    .setParentContextId(getId())
162                    .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
163                    .build())
164            .build();
165
166    this.contextControlHandler.send(contextControlProto);
167  }
168
169  @Override
170  public synchronized void submitContextAndService(
171      final Configuration contextConfiguration, final Configuration serviceConfiguration) {
172
173    if (this.isClosed) {
174      throw new RuntimeException("Active context already closed");
175    }
176
177    LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
178        new Object[]{getEvaluatorId(), getId()});
179
180    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
181        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
182            .setAddContext(
183                EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
184                    .setParentContextId(getId())
185                    .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
186                    .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration))
187                    .build())
188            .build();
189
190    this.contextControlHandler.send(contextControlProto);
191  }
192
193  @Override
194  public String getEvaluatorId() {
195    return this.evaluatorIdentifier;
196  }
197
198  @Override
199  public Optional<String> getParentId() {
200    return this.parentID;
201  }
202
203  @Override
204  public EvaluatorDescriptor getEvaluatorDescriptor() {
205    return this.evaluatorDescriptor;
206  }
207
208  @Override
209  public String getId() {
210    return this.contextIdentifier;
211  }
212
213  @Override
214  public String toString() {
215    return "EvaluatorContext{" +
216        "contextIdentifier='" + this.contextIdentifier + '\'' +
217        ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' +
218        ", parentID=" + this.parentID + '}';
219  }
220
221  public synchronized final ClosedContext getClosedContext(final ActiveContext parentContext) {
222    return new ClosedContextImpl(
223        parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor());
224  }
225
226  /**
227   * @return a FailedContext for the case of an EvaluatorFailure.
228   */
229  public synchronized FailedContext getFailedContextForEvaluatorFailure() {
230
231    final String id = this.getId();
232    final Optional<String> description = Optional.empty();
233    final Optional<byte[]> data = Optional.empty();
234    final Optional<Throwable> cause = Optional.empty();
235    final String message = "Evaluator Failure";
236
237    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
238        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
239        Optional.<ActiveContext>empty();
240
241    final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor();
242    final String evaluatorID = getEvaluatorId();
243
244    return new FailedContextImpl(
245        id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID);
246  }
247
248  public synchronized FailedContext getFailedContext(
249      final ReefServiceProtos.ContextStatusProto contextStatusProto) {
250
251    assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState());
252
253    final String id = this.getId();
254    final Optional<String> description = Optional.empty();
255
256    final Optional<byte[]> data = contextStatusProto.hasError() ?
257        Optional.of(contextStatusProto.getError().toByteArray()) :
258        Optional.<byte[]>empty();
259
260    final Optional<Throwable> cause = data.isPresent() ?
261        this.exceptionCodec.fromBytes(data) :
262        Optional.<Throwable>empty();
263
264    final String message = cause.isPresent() ? cause.get().getMessage() : "No message given";
265
266    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
267        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
268        Optional.<ActiveContext>empty();
269
270    final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor();
271    final String evaluatorID = getEvaluatorId();
272
273    return new FailedContextImpl(
274        id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID);
275  }
276
277  public synchronized boolean isRootContext() {
278    return !this.parentID.isPresent();
279  }
280}