This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.task;
020
021import org.apache.reef.io.network.group.api.task.NodeStruct;
022import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
023import org.apache.reef.io.network.group.impl.utils.Utils;
024
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.logging.Logger;
028
029public abstract class NodeStructImpl implements NodeStruct {
030
031  private static final Logger LOG = Logger.getLogger(NodeStructImpl.class.getName());
032
033  private final String id;
034  private final BlockingQueue<GroupCommunicationMessage> dataQue = new LinkedBlockingQueue<>();
035
036  private int version;
037
038  public NodeStructImpl(final String id, final int version) {
039    super();
040    this.id = id;
041    this.version = version;
042  }
043
044  @Override
045  public int getVersion() {
046    return version;
047  }
048
049  @Override
050  public void setVersion(final int version) {
051    this.version = version;
052  }
053
054  @Override
055  public String getId() {
056    return id;
057  }
058
059  @Override
060  public void addData(final GroupCommunicationMessage msg) {
061    LOG.entering("NodeStructImpl", "addData", msg);
062    dataQue.add(msg);
063    LOG.exiting("NodeStructImpl", "addData", msg);
064  }
065
066  @Override
067  public byte[] getData() {
068    LOG.entering("NodeStructImpl", "getData");
069    final GroupCommunicationMessage gcm;
070    try {
071      gcm = dataQue.take();
072    } catch (final InterruptedException e) {
073      throw new RuntimeException("InterruptedException while waiting for data from " + id, e);
074    }
075
076    final byte[] retVal = checkDead(gcm) ? null : Utils.getData(gcm);
077    LOG.exiting("NodeStructImpl", "getData", retVal);
078    return retVal;
079  }
080
081  @Override
082  public String toString() {
083    return "(" + id + "," + version + ")";
084  }
085
086  @Override
087  public boolean equals(final Object obj) {
088    if (obj instanceof NodeStructImpl) {
089      final NodeStructImpl that = (NodeStructImpl) obj;
090      return this.id.equals(that.id) && this.version == that.version;
091    } else {
092      return false;
093    }
094  }
095
096  @Override
097  public int hashCode() {
098    return 31 * id.hashCode() + version;
099  }
100
101  public abstract boolean checkDead(final GroupCommunicationMessage gcm);
102}