001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.context;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.audience.DriverSide;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.context.ActiveContext;
025import org.apache.reef.driver.context.ClosedContext;
026import org.apache.reef.driver.context.FailedContext;
027import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
028import org.apache.reef.proto.EvaluatorRuntimeProtocol;
029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
030import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
032import org.apache.reef.runtime.common.utils.ExceptionCodec;
033import org.apache.reef.tang.Configuration;
034import org.apache.reef.tang.formats.ConfigurationSerializer;
035import org.apache.reef.util.Optional;
036
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Driver-side representation of a Context on an Evaluator.
042 */
043@DriverSide
044@Private
045public final class EvaluatorContext implements ActiveContext {
046
047  private static final Logger LOG = Logger.getLogger(EvaluatorContext.class.getName());
048
049  private final String contextIdentifier;
050  private final String evaluatorIdentifier;
051  private final EvaluatorDescriptor evaluatorDescriptor;
052
053  private final Optional<String> parentID;
054  private final ConfigurationSerializer configurationSerializer;
055  private final ContextControlHandler contextControlHandler;
056  private final ExceptionCodec exceptionCodec;
057  private final ContextRepresenters contextRepresenters;
058
059  private boolean isClosed = false;
060
061  public EvaluatorContext(final String contextIdentifier,
062                          final String evaluatorIdentifier,
063                          final EvaluatorDescriptor evaluatorDescriptor,
064                          final Optional<String> parentID,
065                          final ConfigurationSerializer configurationSerializer,
066                          final ContextControlHandler contextControlHandler,
067                          final EvaluatorMessageDispatcher messageDispatcher,
068                          final ExceptionCodec exceptionCodec,
069                          final ContextRepresenters contextRepresenters) {
070
071    this.contextIdentifier = contextIdentifier;
072    this.evaluatorIdentifier = evaluatorIdentifier;
073    this.evaluatorDescriptor = evaluatorDescriptor;
074    this.parentID = parentID;
075    this.configurationSerializer = configurationSerializer;
076    this.contextControlHandler = contextControlHandler;
077    this.exceptionCodec = exceptionCodec;
078    this.contextRepresenters = contextRepresenters;
079
080    LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'");
081  }
082
083  @Override
084  public synchronized void close() {
085
086    if (this.isClosed) {
087      throw new RuntimeException("Active context already closed");
088    }
089
090    LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]",
091        new Object[]{getEvaluatorId(), getId()});
092
093    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
094        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
095            .setRemoveContext(
096                EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder()
097                    .setContextId(getId())
098                    .build())
099            .build();
100
101    this.contextControlHandler.send(contextControlProto);
102    this.isClosed = true;
103  }
104
105  @Override
106  public synchronized void sendMessage(final byte[] message) {
107
108    if (this.isClosed) {
109      throw new RuntimeException("Active context already closed");
110    }
111
112    LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]",
113        new Object[]{getEvaluatorId(), getId()});
114
115    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
116        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
117            .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder()
118                .setContextId(this.contextIdentifier)
119                .setMessage(ByteString.copyFrom(message))
120                .build())
121            .build();
122
123    this.contextControlHandler.send(contextControlProto);
124  }
125
126  @Override
127  public synchronized void submitTask(final Configuration taskConf) {
128    submitTask(this.configurationSerializer.toString(taskConf));
129  }
130
131  public synchronized void submitTask(final String taskConf) {
132    if (this.isClosed) {
133      throw new RuntimeException("Active context already closed");
134    }
135
136    LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]",
137        new Object[]{getEvaluatorId(), getId()});
138
139    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
140        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
141            .setStartTask(
142                EvaluatorRuntimeProtocol.StartTaskProto.newBuilder()
143                    .setContextId(this.contextIdentifier)
144                    .setConfiguration(taskConf)
145                    .build())
146            .build();
147
148    this.contextControlHandler.send(contextControlProto);
149  }
150
151  @Override
152  public synchronized void submitContext(final Configuration contextConfiguration) {
153    submitContext(this.configurationSerializer.toString(contextConfiguration));
154  }
155
156  public synchronized void submitContext(final String contextConf) {
157    submitContextAndService(contextConf, Optional.<String>empty());
158  }
159
160  @Override
161  public synchronized void submitContextAndService(
162      final Configuration contextConfiguration, final Configuration serviceConfiguration) {
163    submitContextAndService(
164        this.configurationSerializer.toString(contextConfiguration),
165        this.configurationSerializer.toString(serviceConfiguration));
166  }
167
168  public synchronized void submitContextAndService(final String contextConf, final String serviceConf) {
169    submitContextAndService(contextConf, Optional.ofNullable(serviceConf));
170  }
171
172  public synchronized void submitContextAndService(final String contextConf, final Optional<String> serviceConf) {
173    if (this.isClosed) {
174      throw new RuntimeException("Active context already closed");
175    }
176
177    EvaluatorRuntimeProtocol.AddContextProto.Builder contextBuilder =
178        EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
179            .setParentContextId(getId()).setContextConfiguration(contextConf);
180
181    if (serviceConf.isPresent()) {
182      contextBuilder = contextBuilder.setServiceConfiguration(serviceConf.get());
183    }
184
185    final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
186        EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
187            .setAddContext(contextBuilder.build())
188            .build();
189
190    this.contextControlHandler.send(contextControlProto);
191  }
192
193  @Override
194  public String getEvaluatorId() {
195    return this.evaluatorIdentifier;
196  }
197
198  @Override
199  public Optional<String> getParentId() {
200    return this.parentID;
201  }
202
203  @Override
204  public EvaluatorDescriptor getEvaluatorDescriptor() {
205    return this.evaluatorDescriptor;
206  }
207
208  @Override
209  public String getId() {
210    return this.contextIdentifier;
211  }
212
213  @Override
214  public String toString() {
215    return "EvaluatorContext{" +
216        "contextIdentifier='" + this.contextIdentifier + '\'' +
217        ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' +
218        ", parentID=" + this.parentID + '}';
219  }
220
221  public synchronized ClosedContext getClosedContext(final ActiveContext parentContext) {
222    return new ClosedContextImpl(
223        parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor());
224  }
225
226  /**
227   * @return a FailedContext for the case of an EvaluatorFailure.
228   */
229  public synchronized FailedContext getFailedContextForEvaluatorFailure() {
230
231    final String id = this.getId();
232    final Optional<String> description = Optional.empty();
233    final Optional<byte[]> data = Optional.empty();
234    final Optional<Throwable> cause = Optional.empty();
235    final String message = "Evaluator Failure";
236
237    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
238        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
239        Optional.<ActiveContext>empty();
240
241    final String evaluatorID = getEvaluatorId();
242
243    return new FailedContextImpl(
244        id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID);
245  }
246
247  public synchronized FailedContext getFailedContext(
248      final ContextStatusPOJO contextStatus) {
249
250    assert ContextState.FAIL == contextStatus.getContextState();
251
252    final String id = this.getId();
253    final Optional<String> description = Optional.empty();
254
255    final Optional<byte[]> data = contextStatus.hasError() ?
256        Optional.of(contextStatus.getError()) :
257        Optional.<byte[]>empty();
258
259    final Optional<Throwable> cause = data.isPresent() ?
260        this.exceptionCodec.fromBytes(data) :
261        Optional.<Throwable>empty();
262
263    final String message = cause.isPresent() ? cause.get().getMessage() : "No message given";
264
265    final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
266        Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
267        Optional.<ActiveContext>empty();
268
269    final String evaluatorID = getEvaluatorId();
270
271    return new FailedContextImpl(
272        id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID);
273  }
274
275  public synchronized boolean isRootContext() {
276    return !this.parentID.isPresent();
277  }
278}