001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.launch; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.proto.ReefServiceProtos; 023import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; 024import org.apache.reef.runtime.common.launch.parameters.LaunchID; 025import org.apache.reef.runtime.common.utils.ExceptionCodec; 026import org.apache.reef.runtime.common.utils.RemoteManager; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035 036/** 037 * The error handler REEF registers with Wake. 038 */ 039public final class REEFErrorHandler implements EventHandler<Throwable>, AutoCloseable { 040 041 private static final Logger LOG = Logger.getLogger(REEFErrorHandler.class.getName()); 042 043 // This class is used as the ErrorHandler in the RemoteManager. Hence, we need an InjectionFuture here. 044 private final InjectionFuture<RemoteManager> remoteManager; 045 private final String launchID; 046 private final String errorHandlerRID; 047 private final ExceptionCodec exceptionCodec; 048 049 @Inject 050 REEFErrorHandler(final InjectionFuture<RemoteManager> remoteManager, 051 @Parameter(ErrorHandlerRID.class) final String errorHandlerRID, 052 @Parameter(LaunchID.class) final String launchID, 053 final ExceptionCodec exceptionCodec) { 054 this.errorHandlerRID = errorHandlerRID; 055 this.remoteManager = remoteManager; 056 this.launchID = launchID; 057 this.exceptionCodec = exceptionCodec; 058 } 059 060 @Override 061 @SuppressWarnings("checkstyle:illegalcatch") 062 public void onNext(final Throwable e) { 063 LOG.log(Level.SEVERE, "Uncaught exception.", e); 064 if (!this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) { 065 final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = this.remoteManager.get() 066 .getHandler(errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class); 067 final ReefServiceProtos.RuntimeErrorProto message = ReefServiceProtos.RuntimeErrorProto.newBuilder() 068 .setName("reef") 069 .setIdentifier(launchID) 070 .setMessage(e.getMessage()) 071 .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(e))) 072 .build(); 073 try { 074 runtimeErrorHandler.onNext(message); 075 } catch (final Throwable t) { 076 LOG.log(Level.SEVERE, "Unable to send the error upstream", t); 077 } 078 } else { 079 LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream"); 080 } 081 } 082 083 @SuppressWarnings("checkstyle:illegalcatch") 084 public void close() { 085 try { 086 this.remoteManager.get().close(); 087 } catch (final Throwable ex) { 088 LOG.log(Level.SEVERE, "Unable to close the remote manager", ex); 089 } 090 } 091 092 @Override 093 public String toString() { 094 return "REEFErrorHandler{" + 095 "remoteManager=" + remoteManager + 096 ", launchID='" + launchID + '\'' + 097 ", errorHandlerRID='" + errorHandlerRID + '\'' + 098 '}'; 099 } 100}