001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.api; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.driver.context.ContextConfiguration; 023import org.apache.reef.driver.evaluator.AllocatedEvaluator; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.evaluator.FailedEvaluator; 027import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer; 028import org.apache.reef.io.network.util.Pair; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.annotations.Parameter; 031import org.apache.reef.tang.annotations.Unit; 032import org.apache.reef.tang.exceptions.BindException; 033import org.apache.reef.wake.EventHandler; 034import org.apache.reef.wake.impl.SingleThreadStage; 035import org.apache.reef.wake.time.Clock; 036import org.apache.reef.wake.time.event.Alarm; 037import org.apache.reef.wake.time.event.StartTime; 038 039import javax.inject.Inject; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.LinkedBlockingQueue; 044import java.util.concurrent.atomic.AtomicInteger; 045import java.util.logging.Level; 046import java.util.logging.Logger; 047 048/** 049 * The driver component for the DataLoadingService 050 * Also acts as the central point for resource requests 051 * All the allocated evaluators pass through this and 052 * the ones that need data loading have a context stacked 053 * that enables a task to get access to Data via the 054 * {@link DataSet}. 055 * <p/> 056 * TODO: Add timeouts 057 */ 058@DriverSide 059@Unit 060public class DataLoader { 061 062 private static final Logger LOG = Logger.getLogger(DataLoader.class.getName()); 063 064 private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs = new ConcurrentHashMap<>(); 065 private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap<>(); 066 private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue<>(); 067 private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>(); 068 069 private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0); 070 071 private final DataLoadingService dataLoadingService; 072 private final int dataEvalMemoryMB; 073 private final int dataEvalCore; 074 private final EvaluatorRequest computeRequest; 075 private final SingleThreadStage<EvaluatorRequest> resourceRequestStage; 076 private final ResourceRequestHandler resourceRequestHandler; 077 private final int computeEvalMemoryMB; 078 private final int computeEvalCore; 079 private final EvaluatorRequestor requestor; 080 081 @Inject 082 public DataLoader( 083 final Clock clock, 084 final EvaluatorRequestor requestor, 085 final DataLoadingService dataLoadingService, 086 final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorMemoryMB.class) int dataEvalMemoryMB, 087 final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorNumberOfCores.class) int dataEvalCore, 088 final @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequest.class) String serializedComputeRequest) { 089 090 // FIXME: Issue #855: We need this alarm to look busy for REEF. 091 clock.scheduleAlarm(30000, new EventHandler<Alarm>() { 092 @Override 093 public void onNext(final Alarm time) { 094 LOG.log(Level.FINE, "Received Alarm: {0}", time); 095 } 096 }); 097 098 this.requestor = requestor; 099 this.dataLoadingService = dataLoadingService; 100 this.dataEvalMemoryMB = dataEvalMemoryMB; 101 this.dataEvalCore = dataEvalCore; 102 this.resourceRequestHandler = new ResourceRequestHandler(requestor); 103 this.resourceRequestStage = new SingleThreadStage<>(this.resourceRequestHandler, 2); 104 105 if (serializedComputeRequest.equals("NULL")) { 106 this.computeRequest = null; 107 this.computeEvalMemoryMB = -1; 108 computeEvalCore = 1; 109 } else { 110 this.computeRequest = EvaluatorRequestSerializer.deserialize(serializedComputeRequest); 111 this.computeEvalMemoryMB = this.computeRequest.getMegaBytes(); 112 this.computeEvalCore = this.computeRequest.getNumberOfCores(); 113 this.numComputeRequestsToSubmit.set(this.computeRequest.getNumber()); 114 115 this.resourceRequestStage.onNext(this.computeRequest); 116 } 117 118 this.resourceRequestStage.onNext(getDataLoadingRequest()); 119 } 120 121 private EvaluatorRequest getDataLoadingRequest() { 122 return EvaluatorRequest.newBuilder() 123 .setNumber(this.dataLoadingService.getNumberOfPartitions()) 124 .setMemory(this.dataEvalMemoryMB) 125 .setNumberOfCores(this.dataEvalCore) 126 .build(); 127 } 128 129 public class StartHandler implements EventHandler<StartTime> { 130 @Override 131 public void onNext(final StartTime startTime) { 132 LOG.log(Level.INFO, "StartTime: {0}", startTime); 133 resourceRequestHandler.releaseResourceRequestGate(); 134 } 135 } 136 137 public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 138 139 @Override 140 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 141 142 final String evalId = allocatedEvaluator.getId(); 143 LOG.log(Level.FINEST, "Allocated evaluator: {0}", evalId); 144 145 if (!failedComputeEvalConfigs.isEmpty()) { 146 LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", evalId); 147 final Configuration conf = failedComputeEvalConfigs.poll(); 148 if (conf != null) { 149 LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId); 150 allocatedEvaluator.submitContext(conf); 151 submittedComputeEvalConfigs.put(evalId, conf); 152 return; 153 } 154 } 155 156 if (!failedDataEvalConfigs.isEmpty()) { 157 LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", evalId); 158 final Pair<Configuration, Configuration> confPair = failedDataEvalConfigs.poll(); 159 if (confPair != null) { 160 LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId); 161 allocatedEvaluator.submitContextAndService(confPair.first, confPair.second); 162 submittedDataEvalConfigs.put(evalId, confPair); 163 return; 164 } 165 } 166 167 final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet(); 168 LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest); 169 170 if (evaluatorsForComputeRequest >= 0) { 171 try { 172 final Configuration idConfiguration = ContextConfiguration.CONF 173 .set(ContextConfiguration.IDENTIFIER, 174 dataLoadingService.getComputeContextIdPrefix() + evaluatorsForComputeRequest) 175 .build(); 176 LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId); 177 allocatedEvaluator.submitContext(idConfiguration); 178 submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), idConfiguration); 179 if (evaluatorsForComputeRequest == 0) { 180 LOG.log(Level.FINE, "All Compute requests satisfied. Releasing gate"); 181 resourceRequestHandler.releaseResourceRequestGate(); 182 } 183 } catch (final BindException e) { 184 throw new RuntimeException("Unable to bind context id for Compute request", e); 185 } 186 187 } else { 188 189 final Pair<Configuration, Configuration> confPair = new Pair<>( 190 dataLoadingService.getContextConfiguration(allocatedEvaluator), 191 dataLoadingService.getServiceConfiguration(allocatedEvaluator)); 192 193 LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId); 194 allocatedEvaluator.submitContextAndService(confPair.first, confPair.second); 195 submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair); 196 } 197 } 198 } 199 200 public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> { 201 @Override 202 public void onNext(final FailedEvaluator failedEvaluator) { 203 204 final String evalId = failedEvaluator.getId(); 205 206 final Configuration computeConfig = submittedComputeEvalConfigs.remove(evalId); 207 if (computeConfig != null) { 208 209 LOG.log(Level.INFO, "Received failed compute evaluator: {0}", evalId); 210 failedComputeEvalConfigs.add(computeConfig); 211 212 requestor.submit(EvaluatorRequest.newBuilder() 213 .setMemory(computeEvalMemoryMB).setNumber(1).setNumberOfCores(computeEvalCore).build()); 214 215 } else { 216 217 final Pair<Configuration, Configuration> confPair = submittedDataEvalConfigs.remove(evalId); 218 if (confPair != null) { 219 220 LOG.log(Level.INFO, "Received failed data evaluator: {0}", evalId); 221 failedDataEvalConfigs.add(confPair); 222 223 requestor.submit(EvaluatorRequest.newBuilder() 224 .setMemory(dataEvalMemoryMB).setNumber(1).setNumberOfCores(dataEvalCore).build()); 225 226 } else { 227 228 LOG.log(Level.SEVERE, "Received unknown failed evaluator " + evalId, 229 failedEvaluator.getEvaluatorException()); 230 231 throw new RuntimeException("Received failed evaluator that I did not submit: " + evalId); 232 } 233 } 234 } 235 } 236}