This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.context;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.context.ActiveContext;
025import org.apache.reef.driver.context.ClosedContext;
026import org.apache.reef.driver.context.FailedContext;
027import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
028import org.apache.reef.proto.EvaluatorRuntimeProtocol;
029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
030import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
032import org.apache.reef.runtime.common.utils.ExceptionCodec;
033import org.apache.reef.tang.Configuration;
034import org.apache.reef.tang.formats.ConfigurationSerializer;
035import org.apache.reef.util.Optional;
036
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Driver-side representation of a Context on an Evaluator.
042 */
043@DriverSide
044@Private
045public final class EvaluatorContext implements ActiveContext {
046
047  private static final Logger LOG = Logger.getLogger(EvaluatorContext.class.getName());
048
049  private final String contextIdentifier;
050  private final String evaluatorIdentifier;
051  private final EvaluatorDescriptor evaluatorDescriptor;
052
053  private final Optional<String> parentID;
054  private final ConfigurationSerializer configurationSerializer;
055  private final ContextControlHandler contextControlHandler;
056  private final ExceptionCodec exceptionCodec;
057  private final ContextRepresenters contextRepresenters;
058
059  private boolean isClosed = false;
060
061  public EvaluatorContext(final String contextIdentifier,
062                          final String evaluatorIdentifier,
063                          final EvaluatorDescriptor evaluatorDescriptor,
064                          final Optional<String> parentID,
065                          final ConfigurationSerializer configurationSerializer,
066                          final ContextControlHandler contextControlHandler,
067                          final EvaluatorMessageDispatcher messageDispatcher,
068                          final ExceptionCodec exceptionCodec,
069                          final ContextRepresenters contextRepresenters) {
070
071    this.contextIdentifier = contextIdentifier;
072    this.evaluatorIdentifier = evaluatorIdentifier;
073    this.evaluatorDescriptor = evaluatorDescriptor;
074    this.parentID = parentID;
075    this.configurationSerializer = configurationSerializer;
076    this.contextControlHandler = contextControlHandler;
077    this.exceptionCodec = exceptionCodec;
078    this.contextRepresenters = contextRepresenters;
079
080    LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'");
081  }
082
083  @Override
084  public synchronized void close() {
085
086    if (this.isClosed) {
087      throw new RuntimeException("Active context already closed");
088    }
089
090    LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]",
091        new Object[]{getEvaluatorId(), getId()});
092
093    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
094        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
095            .setRemoveContext(
096                EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder()
097                    .setContextId(getId())
098                    .build())
099            .build();
100
101    this.contextControlHandler.send(contextControlProto);
102    this.isClosed = true;
103  }
104
105  @Override
106  public synchronized void sendMessage(final byte[] message) {
107
108    if (this.isClosed) {
109      throw new RuntimeException("Active context already closed");
110    }
111
112    LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]",
113        new Object[]{getEvaluatorId(), getId()});
114
115    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
116        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
117            .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder()
118                .setContextId(this.contextIdentifier)
119                .setMessage(ByteString.copyFrom(message))
120                .build())
121            .build();
122
123    this.contextControlHandler.send(contextControlProto);
124  }
125
126  @Override
127  public synchronized void submitTask(final Configuration taskConf) {
128    submitTask(this.configurationSerializer.toString(taskConf));
129  }
130
131  public synchronized void submitTask(final String taskConf) {
132    if (this.isClosed) {
133      throw new RuntimeException("Active context already closed");
134    }
135
136    LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]",
137        new Object[]{getEvaluatorId(), getId()});
138
139    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
140        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
141            .setStartTask(
142                EvaluatorRuntimeProtocol.StartTaskProto.newBuilder()
143                    .setContextId(this.contextIdentifier)
144                    .setConfiguration(taskConf)
145                    .build())
146            .build();
147
148    this.contextControlHandler.send(contextControlProto);
149  }
150
151  @Override
152  public synchronized void submitContext(final Configuration contextConfiguration) {
153    submitContext(this.configurationSerializer.toString(contextConfiguration));
154  }
155
156  public synchronized void submitContext(final String contextConf) {
157    if (this.isClosed) {
158      throw new RuntimeException("Active context already closed");
159    }
160
161    LOG.log(Level.FINEST, "Submit context: RunningEvaluator id[{0}] for context id[{1}]",
162        new Object[]{getEvaluatorId(), getId()});
163
164    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
165        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
166            .setAddContext(
167                EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
168                    .setParentContextId(getId())
169                    .setContextConfiguration(contextConf)
170                    .build())
171            .build();
172
173    this.contextControlHandler.send(contextControlProto);
174  }
175
176  @Override
177  public synchronized void submitContextAndService(
178      final Configuration contextConfiguration, final Configuration serviceConfiguration) {
179
180    if (this.isClosed) {
181      throw new RuntimeException("Active context already closed");
182    }
183
184    LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
185        new Object[]{getEvaluatorId(), getId()});
186
187    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
188        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
189            .setAddContext(
190                EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
191                    .setParentContextId(getId())
192                    .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
193                    .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration))
194                    .build())
195            .build();
196
197    this.contextControlHandler.send(contextControlProto);
198  }
199
200  @Override
201  public String getEvaluatorId() {
202    return this.evaluatorIdentifier;
203  }
204
205  @Override
206  public Optional<String> getParentId() {
207    return this.parentID;
208  }
209
210  @Override
211  public EvaluatorDescriptor getEvaluatorDescriptor() {
212    return this.evaluatorDescriptor;
213  }
214
215  @Override
216  public String getId() {
217    return this.contextIdentifier;
218  }
219
220  @Override
221  public String toString() {
222    return "EvaluatorContext{" +
223        "contextIdentifier='" + this.contextIdentifier + '\'' +
224        ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' +
225        ", parentID=" + this.parentID + '}';
226  }
227
228  public synchronized ClosedContext getClosedContext(final ActiveContext parentContext) {
229    return new ClosedContextImpl(
230        parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor());
231  }
232
233  /**
234   * @return a FailedContext for the case of an EvaluatorFailure.
235   */
236  public synchronized FailedContext getFailedContextForEvaluatorFailure() {
237
238    final String id = this.getId();
239    final Optional<String> description = Optional.empty();
240    final Optional<byte[]> data = Optional.empty();
241    final Optional<Throwable> cause = Optional.empty();
242    final String message = "Evaluator Failure";
243
244    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
245        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
246        Optional.<ActiveContext>empty();
247
248    final String evaluatorID = getEvaluatorId();
249
250    return new FailedContextImpl(
251        id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID);
252  }
253
254  public synchronized FailedContext getFailedContext(
255      final ContextStatusPOJO contextStatus) {
256
257    assert ContextState.FAIL == contextStatus.getContextState();
258
259    final String id = this.getId();
260    final Optional<String> description = Optional.empty();
261
262    final Optional<byte[]> data = contextStatus.hasError() ?
263        Optional.of(contextStatus.getError()) :
264        Optional.<byte[]>empty();
265
266    final Optional<Throwable> cause = data.isPresent() ?
267        this.exceptionCodec.fromBytes(data) :
268        Optional.<Throwable>empty();
269
270    final String message = cause.isPresent() ? cause.get().getMessage() : "No message given";
271
272    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
273        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
274        Optional.<ActiveContext>empty();
275
276    final String evaluatorID = getEvaluatorId();
277
278    return new FailedContextImpl(
279        id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID);
280  }
281
282  public synchronized boolean isRootContext() {
283    return !this.parentID.isPresent();
284  }
285}