This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.impl;
020
021import org.apache.reef.wake.EventHandler;
022import org.apache.reef.wake.remote.RemoteIdentifier;
023import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
024
025import java.util.logging.Level;
026import java.util.logging.Logger;
027
028/**
029 * Proxy of the event handler that runs remotely
030 *
031 * @param <T> type
032 */
033public class ProxyEventHandler<T> implements EventHandler<T> {
034  private static final Logger LOG = Logger.getLogger(ProxyEventHandler.class.getName());
035
036  private final SocketRemoteIdentifier myId;
037  private final SocketRemoteIdentifier remoteId;
038  private final String remoteSinkName;
039  private final EventHandler<RemoteEvent<T>> handler;
040  private final RemoteSeqNumGenerator seqGen;
041
042  /**
043   * Constructs a proxy event handler
044   *
045   * @param myId           my identifier
046   * @param remoteId       the remote identifier
047   * @param remoteSinkName the remote sink name
048   * @param reStage        the sender stage
049   * @throws RemoteRuntimeException
050   */
051  public ProxyEventHandler(RemoteIdentifier myId, RemoteIdentifier remoteId, String remoteSinkName, EventHandler<RemoteEvent<T>> handler, RemoteSeqNumGenerator seqGen) {
052    LOG.log(Level.FINE, "ProxyEventHandler myId: {0} remoteId: {1} remoteSink: {2} handler: {3}", new Object[]{myId, remoteId, remoteSinkName, handler});
053    if (!(myId instanceof SocketRemoteIdentifier && remoteId instanceof SocketRemoteIdentifier)) {
054      throw new RemoteRuntimeException("Unsupported remote identifier type");
055    }
056
057    this.myId = (SocketRemoteIdentifier) myId;
058    this.remoteId = (SocketRemoteIdentifier) remoteId;
059    this.remoteSinkName = remoteSinkName;
060    this.handler = handler;
061    this.seqGen = seqGen;
062  }
063
064  /**
065   * Sends the event to the event handler running remotely
066   *
067   * @param event the event
068   */
069  @Override
070  public void onNext(T event) {
071    if (LOG.isLoggable(Level.FINE))
072      LOG.log(Level.FINE, "remoteid: {0}\n{1}", new Object[]{remoteId.getSocketAddress(), event.toString()});
073    handler.onNext(new RemoteEvent<T>(myId.getSocketAddress(), remoteId.getSocketAddress(), "", remoteSinkName,
074        seqGen.getNextSeq(remoteId.getSocketAddress()), event));
075  }
076
077  /**
078   * Returns a string representation of the object
079   *
080   * @return a string representation of the object
081   */
082  public String toString() {
083    StringBuilder builder = new StringBuilder();
084    builder.append(this.getClass().getName());
085    builder.append(" remote_id=");
086    builder.append(remoteId.toString());
087    return builder.toString();
088  }
089}