001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.network.group.impl.utils;
020
021import java.util.concurrent.atomic.AtomicInteger;
022import java.util.logging.Logger;
023
024public class CountingSemaphore {
025
026  private static final Logger LOG = Logger.getLogger(CountingSemaphore.class.getName());
027
028  private final AtomicInteger counter;
029
030  private final String name;
031
032  private final Object lock;
033
034  private final int initCount;
035
036  public CountingSemaphore(final int initCount, final String name, final Object lock) {
037    super();
038    this.initCount = initCount;
039    this.name = name;
040    this.lock = lock;
041    this.counter = new AtomicInteger(initCount);
042    LOG.finest("Counter initialized to " + initCount);
043  }
044
045  public int getInitialCount() {
046    return initCount;
047  }
048
049  public int increment() {
050    synchronized (lock) {
051      final int retVal = counter.incrementAndGet();
052      LOG.finest(name + "Incremented counter to " + retVal);
053      logStatus();
054      return retVal;
055    }
056  }
057
058  private void logStatus() {
059    final int yetToRun = counter.get();
060    final int curRunning = initCount - yetToRun;
061    LOG.fine(name + curRunning + " workers are running & " + yetToRun + " workers are yet to run");
062  }
063
064  public int decrement() {
065    synchronized (lock) {
066      final int retVal = counter.decrementAndGet();
067      LOG.finest(name + "Decremented counter to " + retVal);
068      if (retVal < 0) {
069        LOG.warning("Counter negative. More workers exist than you expected");
070      }
071      if (retVal <= 0) {
072        LOG.finest(name + "All workers are done with their task. Notifying waiting threads");
073        lock.notifyAll();
074      } else {
075        LOG.finest(name + "Some workers are not done yet");
076      }
077      logStatus();
078      return retVal;
079    }
080  }
081
082  public int get() {
083    synchronized (lock) {
084      return counter.get();
085    }
086  }
087
088  public void await() {
089    synchronized (lock) {
090      LOG.finest(name + "Waiting for workers to be done");
091      while (counter.get() > 0) {
092        try {
093          lock.wait();
094          LOG.finest(name + "Notified with counter=" + counter.get());
095        } catch (final InterruptedException e) {
096          throw new RuntimeException("InterruptedException while waiting for counting semaphore counter", e);
097        }
098      }
099      LOG.finest(name + "Returning from wait");
100    }
101  }
102
103}