This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.loading;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.context.ContextConfiguration;
024import org.apache.reef.driver.task.CompletedTask;
025import org.apache.reef.driver.task.TaskConfiguration;
026import org.apache.reef.io.data.loading.api.DataLoadingService;
027import org.apache.reef.poison.PoisonedConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.Tang;
030import org.apache.reef.tang.annotations.Unit;
031import org.apache.reef.tang.exceptions.BindException;
032import org.apache.reef.wake.EventHandler;
033
034import javax.inject.Inject;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Driver side for the line counting demo that uses the data loading service.
041 */
042@DriverSide
043@Unit
044public class LineCounter {
045
046  private static final Logger LOG = Logger.getLogger(LineCounter.class.getName());
047
048  private final AtomicInteger ctrlCtxIds = new AtomicInteger();
049  private final AtomicInteger lineCnt = new AtomicInteger();
050  private final AtomicInteger completedDataTasks = new AtomicInteger();
051
052  private final DataLoadingService dataLoadingService;
053
054  @Inject
055  public LineCounter(final DataLoadingService dataLoadingService) {
056    this.dataLoadingService = dataLoadingService;
057    this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions());
058  }
059
060  public class ContextActiveHandler implements EventHandler<ActiveContext> {
061
062    @Override
063    public void onNext(final ActiveContext activeContext) {
064
065      final String contextId = activeContext.getId();
066      LOG.log(Level.FINER, "Context active: {0}", contextId);
067
068      if (dataLoadingService.isDataLoadedContext(activeContext)) {
069
070        final String lcContextId = "LineCountCtxt-" + ctrlCtxIds.getAndIncrement();
071        LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}",
072            new Object[]{lcContextId, contextId});
073
074        final Configuration poisonedConfiguration = PoisonedConfiguration.CONTEXT_CONF
075            .set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4")
076            .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
077            .build();
078
079        activeContext.submitContext(Tang.Factory.getTang()
080            .newConfigurationBuilder(poisonedConfiguration,
081                ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, lcContextId).build())
082            .build());
083
084      } else if (activeContext.getId().startsWith("LineCountCtxt")) {
085
086        final String taskId = "LineCountTask-" + ctrlCtxIds.getAndIncrement();
087        LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{taskId, contextId});
088
089        try {
090          activeContext.submitTask(TaskConfiguration.CONF
091              .set(TaskConfiguration.IDENTIFIER, taskId)
092              .set(TaskConfiguration.TASK, LineCountingTask.class)
093              .build());
094        } catch (final BindException ex) {
095          LOG.log(Level.SEVERE, "Configuration error in " + contextId, ex);
096          throw new RuntimeException("Configuration error in " + contextId, ex);
097        }
098      } else {
099        LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", contextId);
100        activeContext.close();
101      }
102    }
103  }
104
105  public class TaskCompletedHandler implements EventHandler<CompletedTask> {
106    @Override
107    public void onNext(final CompletedTask completedTask) {
108
109      final String taskId = completedTask.getId();
110      LOG.log(Level.FINEST, "Completed Task: {0}", taskId);
111
112      final byte[] retBytes = completedTask.get();
113      final String retStr = retBytes == null ? "No RetVal" : new String(retBytes);
114      LOG.log(Level.FINE, "Line count from {0} : {1}", new String[]{taskId, retStr});
115
116      lineCnt.addAndGet(Integer.parseInt(retStr));
117
118      if (completedDataTasks.decrementAndGet() <= 0) {
119        LOG.log(Level.INFO, "Total line count: {0}", lineCnt.get());
120      }
121
122      LOG.log(Level.FINEST, "Releasing Context: {0}", taskId);
123      completedTask.getActiveContext().close();
124    }
125  }
126}