001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.launch; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; 024import org.apache.reef.runtime.common.launch.parameters.LaunchID; 025import org.apache.reef.runtime.common.utils.ExceptionCodec; 026import org.apache.reef.runtime.common.utils.RemoteManager; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035 036/** 037 * The error handler REEF registers with Wake. 038 */ 039public final class REEFErrorHandler implements EventHandler<Throwable>, AutoCloseable { 040 041 private static final Logger LOG = Logger.getLogger(REEFErrorHandler.class.getName()); 042 private static final String CLASS_NAME = REEFErrorHandler.class.getCanonicalName(); 043 044 // This class is used as the ErrorHandler in the RemoteManager. Hence, we need an InjectionFuture here. 045 private final InjectionFuture<RemoteManager> remoteManager; 046 private final String launchID; 047 private final String errorHandlerRID; 048 private final ExceptionCodec exceptionCodec; 049 050 @Inject 051 REEFErrorHandler( 052 @Parameter(ErrorHandlerRID.class) final String errorHandlerRID, 053 @Parameter(LaunchID.class) final String launchID, 054 final InjectionFuture<RemoteManager> remoteManager, 055 final ExceptionCodec exceptionCodec) { 056 057 this.errorHandlerRID = errorHandlerRID; 058 this.remoteManager = remoteManager; 059 this.launchID = launchID; 060 this.exceptionCodec = exceptionCodec; 061 } 062 063 @Override 064 @SuppressWarnings("checkstyle:illegalcatch") 065 public void onNext(final Throwable ex) { 066 067 LOG.log(Level.SEVERE, "Uncaught exception.", ex); 068 069 if (this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) { 070 LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream"); 071 return; 072 } 073 074 try { 075 076 final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = 077 this.remoteManager.get().getHandler(this.errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class); 078 079 final ReefServiceProtos.RuntimeErrorProto message = 080 ReefServiceProtos.RuntimeErrorProto.newBuilder() 081 .setName("reef") 082 .setIdentifier(this.launchID) 083 .setMessage(ex.getMessage()) 084 .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(ex))) 085 .build(); 086 087 runtimeErrorHandler.onNext(message); 088 LOG.log(Level.INFO, "Successfully sent the error upstream: {0}", ex.toString()); 089 090 } catch (final Throwable t) { 091 LOG.log(Level.SEVERE, "Unable to send the error upstream", t); 092 } 093 } 094 095 @SuppressWarnings("checkstyle:illegalcatch") 096 public void close() { 097 098 LOG.entering(CLASS_NAME, "close"); 099 100 try { 101 this.remoteManager.get().close(); 102 } catch (final Throwable ex) { 103 LOG.log(Level.SEVERE, "Unable to close the remote manager", ex); 104 } 105 106 LOG.exiting(CLASS_NAME, "close"); 107 } 108 109 @Override 110 public String toString() { 111 return String.format( 112 "REEFErrorHandler: { remoteManager:{%s}, launchID:%s, errorHandlerRID:%s }", 113 this.remoteManager.get(), this.launchID, this.errorHandlerRID); 114 } 115}