This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge;
020
021import org.apache.avro.io.*;
022import org.apache.avro.specific.SpecificDatumReader;
023import org.apache.avro.specific.SpecificDatumWriter;
024import org.apache.commons.lang3.StringUtils;
025import org.apache.reef.annotations.audience.Interop;
026import org.apache.reef.annotations.audience.Private;
027import org.apache.reef.driver.task.FailedTask;
028import org.apache.reef.javabridge.avro.AvroFailedTask;
029
030import java.io.ByteArrayInputStream;
031import java.io.ByteArrayOutputStream;
032import java.io.IOException;
033import java.nio.ByteBuffer;
034import java.util.logging.Logger;
035
036/**
037 * The Java-CLR bridge object for {@link org.apache.reef.driver.task.FailedTask}.
038 */
039@Private
040@Interop(
041    CppFiles = { "Clr2JavaImpl.h", "FailedTaskClr2Java.cpp" },
042    CsFiles = { "IFailedTaskClr2Java.cs", "FailedTask.cs" })
043public final class FailedTaskBridge extends NativeBridge {
044  private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName());
045
046  private final FailedTask jfailedTask;
047  private final ActiveContextBridge jactiveContext;
048  private final byte[] failedTaskSerializedAvro;
049
050  public FailedTaskBridge(final FailedTask failedTask, final ActiveContextBridgeFactory factory) {
051    this.jfailedTask = failedTask;
052    if (failedTask.getActiveContext().isPresent()) {
053      this.jactiveContext = factory.getActiveContextBridge(failedTask.getActiveContext().get());
054    } else {
055      this.jactiveContext = null;
056    }
057
058    try {
059      this.failedTaskSerializedAvro = generateFailedTaskSerializedAvro();
060    } catch(final Exception e) {
061      throw new RuntimeException(e);
062    }
063  }
064
065  public ActiveContextBridge getActiveContext() {
066    return jactiveContext;
067  }
068
069  public byte[] getFailedTaskSerializedAvro() {
070    return failedTaskSerializedAvro;
071  }
072
073  private byte[] generateFailedTaskSerializedAvro() throws IOException {
074    AvroFailedTask avroFailedTask = null;
075
076    if (jfailedTask.getData() != null && jfailedTask.getData().isPresent()) {
077      // Deserialize what was passed in from C#.
078      try (final ByteArrayInputStream fileInputStream = new ByteArrayInputStream(jfailedTask.getData().get())) {
079        final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
080            AvroFailedTask.getClassSchema(), fileInputStream);
081        final SpecificDatumReader<AvroFailedTask> reader =
082            new SpecificDatumReader<>(AvroFailedTask.class);
083        avroFailedTask = reader.read(null, decoder);
084      }
085    } else {
086      // This may result from a failed Evaluator.
087      avroFailedTask = AvroFailedTask.newBuilder()
088          .setIdentifier(jfailedTask.getId())
089          .setCause(ByteBuffer.wrap(new byte[0]))
090          .setData(ByteBuffer.wrap(new byte[0]))
091          .setMessage("")
092          .build();
093    }
094
095    // Overwrite the message if Java provides a message and C# does not.
096    // Typically the case for failed Evaluators.
097    if (StringUtils.isNoneBlank(jfailedTask.getMessage()) &&
098        StringUtils.isBlank(avroFailedTask.getMessage().toString())) {
099      avroFailedTask.setMessage(jfailedTask.getMessage());
100    }
101
102    final DatumWriter<AvroFailedTask> datumWriter = new SpecificDatumWriter<>(AvroFailedTask.class);
103
104    try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
105      final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroFailedTask.getSchema(), outputStream);
106      datumWriter.write(avroFailedTask, encoder);
107      encoder.flush();
108      outputStream.flush();
109      return outputStream.toByteArray();
110    }
111  }
112
113  @Override
114  public void close() {
115  }
116}
117