This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.join;
020
021import org.apache.reef.wake.Stage;
022import org.apache.reef.wake.rx.Observer;
023import org.apache.reef.wake.rx.StaticObservable;
024
025public class TupleSource implements StaticObservable, Stage {
026  final Thread[] threads;
027  final Observer<TupleEvent> out;
028
029  public TupleSource(final Observer<TupleEvent> out, final int max, final int numThreads, final boolean evenOnly) {
030    this.out = out;
031    threads = new Thread[numThreads];
032    for (int i = 0; i < numThreads; i++) {
033      final int threadid = i;
034      threads[i] = new Thread(new Runnable() {
035        @Override
036        public void run() {
037          for (int i = 0; i < max / ((evenOnly ? 2 : 1) * numThreads); i++) {
038            int j = i * numThreads + threadid;
039            if (evenOnly) {
040              j *= 2;
041            }
042            out.onNext(new TupleEvent(j, j + ""));
043          }
044        }
045
046      });
047      threads[i].start();
048    }
049  }
050
051  @Override
052  public void close() throws Exception {
053    for (Thread t : threads) {
054      t.join();
055    }
056    out.onCompleted();
057  }
058}