001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.data.loading; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.task.CompletedTask; 025import org.apache.reef.driver.task.TaskConfiguration; 026import org.apache.reef.io.data.loading.api.DataLoadingService; 027import org.apache.reef.poison.PoisonedConfiguration; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.Tang; 030import org.apache.reef.tang.annotations.Unit; 031import org.apache.reef.tang.exceptions.BindException; 032import org.apache.reef.wake.EventHandler; 033 034import javax.inject.Inject; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Driver side for the line counting demo that uses the data loading service. 041 */ 042@DriverSide 043@Unit 044public class LineCounter { 045 046 private static final Logger LOG = Logger.getLogger(LineCounter.class.getName()); 047 048 private final AtomicInteger ctrlCtxIds = new AtomicInteger(); 049 private final AtomicInteger lineCnt = new AtomicInteger(); 050 private final AtomicInteger completedDataTasks = new AtomicInteger(); 051 052 private final DataLoadingService dataLoadingService; 053 054 @Inject 055 public LineCounter(final DataLoadingService dataLoadingService) { 056 this.dataLoadingService = dataLoadingService; 057 this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions()); 058 } 059 060 public class ContextActiveHandler implements EventHandler<ActiveContext> { 061 062 @Override 063 public void onNext(final ActiveContext activeContext) { 064 065 final String contextId = activeContext.getId(); 066 LOG.log(Level.FINER, "Context active: {0}", contextId); 067 068 if (dataLoadingService.isDataLoadedContext(activeContext)) { 069 070 final String lcContextId = "LineCountCtxt-" + ctrlCtxIds.getAndIncrement(); 071 LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}", 072 new Object[]{lcContextId, contextId}); 073 074 final Configuration poisonedConfiguration = PoisonedConfiguration.CONTEXT_CONF 075 .set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4") 076 .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") 077 .build(); 078 079 activeContext.submitContext(Tang.Factory.getTang() 080 .newConfigurationBuilder(poisonedConfiguration, 081 ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, lcContextId).build()) 082 .build()); 083 084 } else if (activeContext.getId().startsWith("LineCountCtxt")) { 085 086 final String taskId = "LineCountTask-" + ctrlCtxIds.getAndIncrement(); 087 LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{taskId, contextId}); 088 089 try { 090 activeContext.submitTask(TaskConfiguration.CONF 091 .set(TaskConfiguration.IDENTIFIER, taskId) 092 .set(TaskConfiguration.TASK, LineCountingTask.class) 093 .build()); 094 } catch (final BindException ex) { 095 LOG.log(Level.SEVERE, "Configuration error in " + contextId, ex); 096 throw new RuntimeException("Configuration error in " + contextId, ex); 097 } 098 } else { 099 LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", contextId); 100 activeContext.close(); 101 } 102 } 103 } 104 105 public class TaskCompletedHandler implements EventHandler<CompletedTask> { 106 @Override 107 public void onNext(final CompletedTask completedTask) { 108 109 final String taskId = completedTask.getId(); 110 LOG.log(Level.FINEST, "Completed Task: {0}", taskId); 111 112 final byte[] retBytes = completedTask.get(); 113 final String retStr = retBytes == null ? "No RetVal" : new String(retBytes); 114 LOG.log(Level.FINE, "Line count from {0} : {1}", new String[]{taskId, retStr}); 115 116 lineCnt.addAndGet(Integer.parseInt(retStr)); 117 118 if (completedDataTasks.decrementAndGet() <= 0) { 119 LOG.log(Level.INFO, "Total line count: {0}", lineCnt.get()); 120 } 121 122 LOG.log(Level.FINEST, "Releasing Context: {0}", taskId); 123 completedTask.getActiveContext().close(); 124 } 125 } 126}