001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.data.loading; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.driver.context.ActiveContext; 023import org.apache.reef.driver.context.ContextConfiguration; 024import org.apache.reef.driver.task.CompletedTask; 025import org.apache.reef.driver.task.TaskConfiguration; 026import org.apache.reef.io.data.loading.api.DataLoadingService; 027import org.apache.reef.poison.PoisonedConfiguration; 028import org.apache.reef.tang.Configuration; 029import org.apache.reef.tang.Tang; 030import org.apache.reef.tang.annotations.Unit; 031import org.apache.reef.tang.exceptions.BindException; 032import org.apache.reef.wake.EventHandler; 033 034import javax.inject.Inject; 035import java.nio.charset.StandardCharsets; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Driver side for the line counting demo that uses the data loading service. 042 */ 043@DriverSide 044@Unit 045public class LineCounter { 046 047 private static final Logger LOG = Logger.getLogger(LineCounter.class.getName()); 048 049 private final AtomicInteger ctrlCtxIds = new AtomicInteger(); 050 private final AtomicInteger lineCnt = new AtomicInteger(); 051 private final AtomicInteger completedDataTasks = new AtomicInteger(); 052 053 private final DataLoadingService dataLoadingService; 054 055 @Inject 056 public LineCounter(final DataLoadingService dataLoadingService) { 057 this.dataLoadingService = dataLoadingService; 058 this.completedDataTasks.set(dataLoadingService.getNumberOfPartitions()); 059 } 060 061 /** 062 * Handler for ActiveContext. 063 */ 064 public class ContextActiveHandler implements EventHandler<ActiveContext> { 065 066 @Override 067 public void onNext(final ActiveContext activeContext) { 068 069 final String contextId = activeContext.getId(); 070 LOG.log(Level.FINER, "Context active: {0}", contextId); 071 072 if (dataLoadingService.isDataLoadedContext(activeContext)) { 073 074 final String lcContextId = "LineCountCtxt-" + ctrlCtxIds.getAndIncrement(); 075 LOG.log(Level.FINEST, "Submit LineCount context {0} to: {1}", 076 new Object[]{lcContextId, contextId}); 077 078 final Configuration poisonedConfiguration = PoisonedConfiguration.CONTEXT_CONF 079 .set(PoisonedConfiguration.CRASH_PROBABILITY, "0.4") 080 .set(PoisonedConfiguration.CRASH_TIMEOUT, "1") 081 .build(); 082 083 activeContext.submitContext(Tang.Factory.getTang() 084 .newConfigurationBuilder(poisonedConfiguration, 085 ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, lcContextId).build()) 086 .build()); 087 088 } else if (activeContext.getId().startsWith("LineCountCtxt")) { 089 090 final String taskId = "LineCountTask-" + ctrlCtxIds.getAndIncrement(); 091 LOG.log(Level.FINEST, "Submit LineCount task {0} to: {1}", new Object[]{taskId, contextId}); 092 093 try { 094 activeContext.submitTask(TaskConfiguration.CONF 095 .set(TaskConfiguration.IDENTIFIER, taskId) 096 .set(TaskConfiguration.TASK, LineCountingTask.class) 097 .build()); 098 } catch (final BindException ex) { 099 LOG.log(Level.SEVERE, "Configuration error in " + contextId, ex); 100 throw new RuntimeException("Configuration error in " + contextId, ex); 101 } 102 } else { 103 LOG.log(Level.FINEST, "Line count Compute Task {0} -- Closing", contextId); 104 activeContext.close(); 105 } 106 } 107 } 108 109 /** 110 * Completed task handler. 111 */ 112 public class TaskCompletedHandler implements EventHandler<CompletedTask> { 113 @Override 114 public void onNext(final CompletedTask completedTask) { 115 116 final String taskId = completedTask.getId(); 117 LOG.log(Level.FINEST, "Completed Task: {0}", taskId); 118 119 final byte[] retBytes = completedTask.get(); 120 final String retStr = retBytes == null ? "No RetVal" : new String(retBytes, StandardCharsets.UTF_8); 121 LOG.log(Level.FINE, "Line count from {0} : {1}", new String[]{taskId, retStr}); 122 123 lineCnt.addAndGet(Integer.parseInt(retStr)); 124 125 if (completedDataTasks.decrementAndGet() <= 0) { 126 LOG.log(Level.INFO, "Total line count: {0}", lineCnt.get()); 127 } 128 129 LOG.log(Level.FINEST, "Releasing Context: {0}", taskId); 130 completedTask.getActiveContext().close(); 131 } 132 } 133}