001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.javabridge; 020 021import org.apache.avro.io.*; 022import org.apache.avro.specific.SpecificDatumReader; 023import org.apache.avro.specific.SpecificDatumWriter; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.reef.annotations.audience.Interop; 026import org.apache.reef.annotations.audience.Private; 027import org.apache.reef.driver.task.FailedTask; 028import org.apache.reef.javabridge.avro.AvroFailedTask; 029 030import java.io.ByteArrayInputStream; 031import java.io.ByteArrayOutputStream; 032import java.io.IOException; 033import java.nio.ByteBuffer; 034import java.util.logging.Logger; 035 036/** 037 * The Java-CLR bridge object for {@link org.apache.reef.driver.task.FailedTask}. 038 */ 039@Private 040@Interop( 041 CppFiles = { "Clr2JavaImpl.h", "FailedTaskClr2Java.cpp" }, 042 CsFiles = { "IFailedTaskClr2Java.cs", "FailedTask.cs" }) 043public final class FailedTaskBridge extends NativeBridge { 044 private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName()); 045 046 private final FailedTask jfailedTask; 047 private final ActiveContextBridge jactiveContext; 048 private final byte[] failedTaskSerializedAvro; 049 050 public FailedTaskBridge(final FailedTask failedTask, final ActiveContextBridgeFactory factory) { 051 this.jfailedTask = failedTask; 052 if (failedTask.getActiveContext().isPresent()) { 053 this.jactiveContext = factory.getActiveContextBridge(failedTask.getActiveContext().get()); 054 } else { 055 this.jactiveContext = null; 056 } 057 058 try { 059 this.failedTaskSerializedAvro = generateFailedTaskSerializedAvro(); 060 } catch(final Exception e) { 061 throw new RuntimeException(e); 062 } 063 } 064 065 public ActiveContextBridge getActiveContext() { 066 return jactiveContext; 067 } 068 069 public byte[] getFailedTaskSerializedAvro() { 070 return failedTaskSerializedAvro; 071 } 072 073 private byte[] generateFailedTaskSerializedAvro() throws IOException { 074 AvroFailedTask avroFailedTask = null; 075 076 if (jfailedTask.getData() != null && jfailedTask.getData().isPresent()) { 077 // Deserialize what was passed in from C#. 078 try (final ByteArrayInputStream fileInputStream = new ByteArrayInputStream(jfailedTask.getData().get())) { 079 final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( 080 AvroFailedTask.getClassSchema(), fileInputStream); 081 final SpecificDatumReader<AvroFailedTask> reader = 082 new SpecificDatumReader<>(AvroFailedTask.class); 083 avroFailedTask = reader.read(null, decoder); 084 } 085 } else { 086 // This may result from a failed Evaluator. 087 avroFailedTask = AvroFailedTask.newBuilder() 088 .setIdentifier(jfailedTask.getId()) 089 .setCause(ByteBuffer.wrap(new byte[0])) 090 .setData(ByteBuffer.wrap(new byte[0])) 091 .setMessage("") 092 .build(); 093 } 094 095 // Overwrite the message if Java provides a message and C# does not. 096 // Typically the case for failed Evaluators. 097 if (StringUtils.isNoneBlank(jfailedTask.getMessage()) && 098 StringUtils.isBlank(avroFailedTask.getMessage().toString())) { 099 avroFailedTask.setMessage(jfailedTask.getMessage()); 100 } 101 102 final DatumWriter<AvroFailedTask> datumWriter = new SpecificDatumWriter<>(AvroFailedTask.class); 103 104 try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { 105 final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroFailedTask.getSchema(), outputStream); 106 datumWriter.write(avroFailedTask, encoder); 107 encoder.flush(); 108 outputStream.flush(); 109 return outputStream.toByteArray(); 110 } 111 } 112 113 @Override 114 public void close() { 115 } 116} 117