This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.loading;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.task.CompletedTask;
025import org.apache.reef.driver.task.TaskConfiguration;
026import org.apache.reef.io.data.loading.api.DataLoadingService;
027import org.apache.reef.poison.PoisonedConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.Tang;
030import org.apache.reef.tang.annotations.Unit;
031import org.apache.reef.tang.exceptions.BindException;
032import org.apache.reef.wake.EventHandler;
033
034import javax.inject.Inject;
035import java.nio.charset.StandardCharsets;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Driver side for the line counting demo that uses the data loading service.
042 */
043@DriverSide
044@Unit
045public class LineCounter {
046
047  private static final Logger LOG = Logger.getLogger(LineCounter.class.getName());
048
049  private final AtomicInteger ctrlCtxIds = new AtomicInteger();
050  private final AtomicInteger lineCnt = new AtomicInteger();
051  private final AtomicInteger completedDataTasks = new AtomicInteger();
052
053  private final DataLoadingService dataLoadingService;
054
055  @Inject
056  public LineCounter(final DataLoadingService dataLoadingService) {
057    this.dataLoadingService = dataLoadingService;
058    this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions());
059  }
060
061  /**
062   * Handler for ActiveContext.
063   */
064  public class ContextActiveHandler implements EventHandler<ActiveContext> {
065
066    @Override
067    public void onNext(final ActiveContext activeContext) {
068
069      final String contextId = activeContext.getId();
070      LOG.log(Level.FINER, "Context active: {0}", contextId);
071
072      if (dataLoadingService.isDataLoadedContext(activeContext)) {
073
074        final String lcContextId = "LineCountCtxt-" + ctrlCtxIds.getAndIncrement();
075        LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}",
076            new Object[]{lcContextId, contextId});
077
078        final Configuration poisonedConfiguration = PoisonedConfiguration.CONTEXT_CONF
079            .set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4")
080            .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
081            .build();
082
083        activeContext.submitContext(Tang.Factory.getTang()
084            .newConfigurationBuilder(poisonedConfiguration,
085                ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, lcContextId).build())
086            .build());
087
088      } else if (activeContext.getId().startsWith("LineCountCtxt")) {
089
090        final String taskId = "LineCountTask-" + ctrlCtxIds.getAndIncrement();
091        LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{taskId, contextId});
092
093        try {
094          activeContext.submitTask(TaskConfiguration.CONF
095              .set(TaskConfiguration.IDENTIFIER, taskId)
096              .set(TaskConfiguration.TASK, LineCountingTask.class)
097              .build());
098        } catch (final BindException ex) {
099          LOG.log(Level.SEVERE, "Configuration error in " + contextId, ex);
100          throw new RuntimeException("Configuration error in " + contextId, ex);
101        }
102      } else {
103        LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", contextId);
104        activeContext.close();
105      }
106    }
107  }
108
109  /**
110   * Completed task handler.
111   */
112  public class TaskCompletedHandler implements EventHandler<CompletedTask> {
113    @Override
114    public void onNext(final CompletedTask completedTask) {
115
116      final String taskId = completedTask.getId();
117      LOG.log(Level.FINEST, "Completed Task: {0}", taskId);
118
119      final byte[] retBytes = completedTask.get();
120      final String retStr = retBytes == null ? "No RetVal" : new String(retBytes, StandardCharsets.UTF_8);
121      LOG.log(Level.FINE, "Line count from {0} : {1}", new String[]{taskId, retStr});
122
123      lineCnt.addAndGet(Integer.parseInt(retStr));
124
125      if (completedDataTasks.decrementAndGet() <= 0) {
126        LOG.log(Level.INFO, "Total line count: {0}", lineCnt.get());
127      }
128
129      LOG.log(Level.FINEST, "Releasing Context: {0}", taskId);
130      completedTask.getActiveContext().close();
131    }
132  }
133}