This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.remote.impl;
020
021import org.apache.reef.wake.EventHandler;
022import org.apache.reef.wake.remote.RemoteIdentifier;
023import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
024
025import java.util.logging.Level;
026import java.util.logging.Logger;
027
028/**
029 * Proxy of the event handler that runs remotely.
030 *
031 * @param <T> type
032 */
033public class ProxyEventHandler<T> implements EventHandler<T> {
034  private static final Logger LOG = Logger.getLogger(ProxyEventHandler.class.getName());
035
036  private final SocketRemoteIdentifier myId;
037  private final SocketRemoteIdentifier remoteId;
038  private final String remoteSinkName;
039  private final EventHandler<RemoteEvent<T>> handler;
040  private final RemoteSeqNumGenerator seqGen;
041
042  /**
043   * Constructs a proxy event handler.
044   *
045   * @param myId           my identifier
046   * @param remoteId       the remote identifier
047   * @param remoteSinkName the remote sink name
048   * @throws RemoteRuntimeException
049   */
050  public ProxyEventHandler(final RemoteIdentifier myId, final RemoteIdentifier remoteId, final String remoteSinkName,
051                           final EventHandler<RemoteEvent<T>> handler, final RemoteSeqNumGenerator seqGen) {
052    LOG.log(Level.FINE, "ProxyEventHandler myId: {0} remoteId: {1} remoteSink: {2} handler: {3}",
053        new Object[]{myId, remoteId, remoteSinkName, handler});
054    if (!(myId instanceof SocketRemoteIdentifier && remoteId instanceof SocketRemoteIdentifier)) {
055      throw new RemoteRuntimeException("Unsupported remote identifier type");
056    }
057
058    this.myId = (SocketRemoteIdentifier) myId;
059    this.remoteId = (SocketRemoteIdentifier) remoteId;
060    this.remoteSinkName = remoteSinkName;
061    this.handler = handler;
062    this.seqGen = seqGen;
063  }
064
065  /**
066   * Sends the event to the event handler running remotely.
067   *
068   * @param event the event
069   */
070  @Override
071  public void onNext(final T event) {
072    if (LOG.isLoggable(Level.FINE)) {
073      LOG.log(Level.FINE, "remoteid: {0}\n{1}", new Object[]{remoteId.getSocketAddress(), event.toString()});
074    }
075    handler.onNext(new RemoteEvent<T>(myId.getSocketAddress(), remoteId.getSocketAddress(),
076        seqGen.getNextSeq(remoteId.getSocketAddress()), event));
077  }
078
079  /**
080   * Returns a string representation of the object.
081   *
082   * @return a string representation of the object
083   */
084  public String toString() {
085    final StringBuilder builder = new StringBuilder();
086    builder.append(this.getClass().getName());
087    builder.append(" remote_id=");
088    builder.append(remoteId.toString());
089    return builder.toString();
090  }
091}