This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.wake.examples.join;
020
021import org.apache.reef.wake.Stage;
022import org.apache.reef.wake.rx.Observer;
023import org.apache.reef.wake.rx.StaticObservable;
024
025/**
026 * A source class generating TupleEvent.
027 */
028public class TupleSource implements StaticObservable, Stage {
029  private final Thread[] threads;
030  private final Observer<TupleEvent> out;
031
032  public TupleSource(final Observer<TupleEvent> out, final int max, final int numThreads, final boolean evenOnly) {
033    this.out = out;
034    threads = new Thread[numThreads];
035    for (int i = 0; i < numThreads; i++) {
036      final int threadid = i;
037      threads[i] = new Thread(new Runnable() {
038        @Override
039        public void run() {
040          for (int i = 0; i < max / ((evenOnly ? 2 : 1) * numThreads); i++) {
041            int j = i * numThreads + threadid;
042            if (evenOnly) {
043              j *= 2;
044            }
045            out.onNext(new TupleEvent(j, j + ""));
046          }
047        }
048
049      });
050      threads[i].start();
051    }
052  }
053
054  @Override
055  public void close() throws Exception {
056    for (final Thread t : threads) {
057      t.join();
058    }
059    out.onCompleted();
060  }
061}