001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.api; 020 021import org.apache.reef.annotations.Unstable; 022import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition; 023 024import java.util.Collection; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Set; 030 031/** 032 * Represents a distributed data set that is split across data centers. 033 * It contains a set of distributed data set partitions {@link DistributedDataSetPartition} 034 * this data to be loaded into. 035 * 036 */ 037@Unstable 038public final class DistributedDataSet implements Iterable<DistributedDataSetPartition> { 039 040 041 /** 042 * The set of distributed data set partitions. 043 */ 044 private final Set<DistributedDataSetPartition> partitions = new HashSet<>(); 045 046 /** 047 * Adds the given partition to the set. 048 * 049 * @param partition 050 * the partition to add 051 */ 052 public void addPartition(final DistributedDataSetPartition partition) { 053 this.partitions.add(partition); 054 } 055 056 /** 057 * Adds the given partitions to the set. 058 * 059 * @param partitions 060 * the partitions to add 061 */ 062 @SuppressWarnings("checkstyle:hiddenfield") 063 public void addPartitions(final Collection<DistributedDataSetPartition> partitions) { 064 this.partitions.addAll(partitions); 065 } 066 067 /** 068 * Returns true if it does not contain any partition. 069 * 070 * @return a boolean indicating whether it contains partitions or not 071 */ 072 public boolean isEmpty() { 073 return this.partitions.isEmpty(); 074 } 075 076 @Override 077 public Iterator<DistributedDataSetPartition> iterator() { 078 return new DistributedDataSetIterator(partitions); 079 } 080 081 static final class DistributedDataSetIterator implements Iterator<DistributedDataSetPartition> { 082 083 private final List<DistributedDataSetPartition> partitions; 084 private int position; 085 086 DistributedDataSetIterator( 087 final Collection<DistributedDataSetPartition> partitions) { 088 this.partitions = new LinkedList<>(partitions); 089 position = 0; 090 } 091 092 @Override 093 public boolean hasNext() { 094 return position < partitions.size(); 095 } 096 097 @Override 098 public DistributedDataSetPartition next() { 099 final DistributedDataSetPartition partition = partitions 100 .get(position); 101 position++; 102 return partition; 103 } 104 105 @Override 106 public void remove() { 107 throw new UnsupportedOperationException( 108 "Remove method has not been implemented in this iterator"); 109 } 110 } 111}