001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver.restart; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.yarn.api.records.ApplicationId; 025import org.apache.reef.annotations.Unstable; 026import org.apache.reef.annotations.audience.DriverSide; 027import org.apache.reef.annotations.audience.RuntimeAuthor; 028import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory; 029import org.apache.reef.driver.parameters.FailDriverOnEvaluatorLogErrors; 030import org.apache.reef.exception.DriverFatalRuntimeException; 031import org.apache.reef.runtime.common.driver.EvaluatorPreserver; 032import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; 033import org.apache.reef.runtime.yarn.util.YarnUtilities; 034import org.apache.reef.tang.annotations.Parameter; 035 036import javax.inject.Inject; 037import java.io.*; 038import java.nio.charset.StandardCharsets; 039import java.util.Arrays; 040import java.util.HashSet; 041import java.util.Set; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * An Evaluator Preserver that uses the DFS on YARN. 047 */ 048@DriverSide 049@RuntimeAuthor 050@Unstable 051public final class DFSEvaluatorPreserver implements EvaluatorPreserver, AutoCloseable { 052 private static final Logger LOG = Logger.getLogger(DFSEvaluatorPreserver.class.getName()); 053 054 private static final String ADD_FLAG = "+"; 055 056 private static final String REMOVE_FLAG = "-"; 057 058 private final boolean failDriverOnEvaluatorLogErrors; 059 060 private DFSEvaluatorLogWriter writer; 061 062 private Path changeLogLocation; 063 064 private FileSystem fileSystem; 065 066 private boolean writerClosed = false; 067 068 @Inject 069 DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class) 070 final boolean failDriverOnEvaluatorLogErrors) { 071 this(failDriverOnEvaluatorLogErrors, "/ReefApplications/" + getEvaluatorChangeLogFolderLocation()); 072 } 073 074 @Inject 075 private DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class) 076 final boolean failDriverOnEvaluatorLogErrors, 077 @Parameter(DriverJobSubmissionDirectory.class) 078 final String jobSubmissionDirectory) { 079 080 this.failDriverOnEvaluatorLogErrors = failDriverOnEvaluatorLogErrors; 081 082 try { 083 final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); 084 this.fileSystem = FileSystem.get(config); 085 this.changeLogLocation = 086 new Path(StringUtils.stripEnd(jobSubmissionDirectory, "/") + "/evaluatorsChangesLog"); 087 088 boolean appendSupported = config.getBoolean("dfs.support.append", false); 089 090 if (appendSupported) { 091 this.writer = new DFSEvaluatorLogAppendWriter(this.fileSystem, this.changeLogLocation); 092 } else { 093 this.writer = new DFSEvaluatorLogOverwriteWriter(this.fileSystem, this.changeLogLocation); 094 } 095 } catch (final IOException e) { 096 final String errMsg = "Cannot read from log file with Exception " + e + 097 ", evaluators will not be recovered."; 098 final String fatalMsg = "Driver was not able to instantiate FileSystem."; 099 100 this.handleException(e, errMsg, fatalMsg); 101 this.fileSystem = null; 102 this.changeLogLocation = null; 103 this.writer = null; 104 } 105 } 106 107 /** 108 * @return the folder for Evaluator changelog. 109 */ 110 private static String getEvaluatorChangeLogFolderLocation() { 111 final ApplicationId appId = YarnUtilities.getApplicationId(); 112 if (appId != null) { 113 return appId.toString(); 114 } 115 116 final String jobIdentifier = EvaluatorManager.getJobIdentifier(); 117 if (jobIdentifier != null) { 118 return jobIdentifier; 119 } 120 121 throw new RuntimeException("Could not retrieve a suitable DFS folder for preserving Evaluator changelog."); 122 } 123 124 /** 125 * Recovers the set of evaluators that are alive. 126 * @return 127 */ 128 @Override 129 public synchronized Set<String> recoverEvaluators() { 130 final Set<String> expectedContainers = new HashSet<>(); 131 try { 132 if (this.fileSystem == null || this.changeLogLocation == null) { 133 LOG.log(Level.WARNING, "Unable to recover evaluators due to failure to instantiate FileSystem. Returning an" + 134 " empty set."); 135 return expectedContainers; 136 } 137 138 if (!this.fileSystem.exists(this.changeLogLocation)) { 139 // empty set 140 return expectedContainers; 141 } else { 142 final BufferedReader br = new BufferedReader( 143 new InputStreamReader(this.fileSystem.open(this.changeLogLocation), StandardCharsets.UTF_8)); 144 String line = br.readLine(); 145 while (line != null) { 146 if (line.startsWith(ADD_FLAG)) { 147 final String containerId = line.substring(ADD_FLAG.length()); 148 if (expectedContainers.contains(containerId)) { 149 LOG.log(Level.WARNING, "Duplicated add container record found in the change log for container " + 150 containerId); 151 } else { 152 expectedContainers.add(containerId); 153 } 154 } else if (line.startsWith(REMOVE_FLAG)) { 155 final String containerId = line.substring(REMOVE_FLAG.length()); 156 if (!expectedContainers.contains(containerId)) { 157 LOG.log(Level.WARNING, "Change log includes record that try to remove non-exist or duplicate " + 158 "remove record for container + " + containerId); 159 } 160 expectedContainers.remove(containerId); 161 } 162 line = br.readLine(); 163 } 164 br.close(); 165 } 166 } catch (final IOException e) { 167 final String errMsg = "Cannot read from log file with Exception " + e + 168 ", evaluators will not be recovered."; 169 170 final String fatalMsg = "Cannot read from evaluator log."; 171 172 this.handleException(e, errMsg, fatalMsg); 173 } 174 return expectedContainers; 175 } 176 177 /** 178 * Adds the allocated evaluator entry to the evaluator log. 179 * @param id 180 */ 181 @Override 182 public synchronized void recordAllocatedEvaluator(final String id) { 183 if (this.fileSystem != null && this.changeLogLocation != null) { 184 final String entry = ADD_FLAG + id + System.lineSeparator(); 185 this.logContainerChange(entry); 186 } 187 } 188 189 /** 190 * Adds the removed evaluator entry to the evaluator log. 191 * @param id 192 */ 193 @Override 194 public synchronized void recordRemovedEvaluator(final String id) { 195 if (this.fileSystem != null && this.changeLogLocation != null) { 196 final String entry = REMOVE_FLAG + id + System.lineSeparator(); 197 this.logContainerChange(entry); 198 } 199 } 200 201 private void logContainerChange(final String entry) { 202 try { 203 this.writer.writeToEvaluatorLog(entry); 204 } catch (final IOException e) { 205 final String errorMsg = "Unable to log the change of container [" + entry + 206 "] to the container log. Driver restart won't work properly."; 207 208 final String fatalMsg = "Unable to log container change."; 209 210 this.handleException(e, errorMsg, fatalMsg); 211 } 212 } 213 214 private void handleException(final Exception e, final String errorMsg, final String fatalMsg){ 215 if (this.failDriverOnEvaluatorLogErrors) { 216 LOG.log(Level.SEVERE, errorMsg, e); 217 218 try { 219 this.close(); 220 } catch (Exception e1) { 221 LOG.log(Level.SEVERE, "Failed on closing resource with " + Arrays.toString(e1.getStackTrace())); 222 } 223 224 if (fatalMsg != null) { 225 throw new DriverFatalRuntimeException(fatalMsg, e); 226 } else { 227 throw new DriverFatalRuntimeException("Driver failed on Evaluator log error.", e); 228 } 229 } 230 } 231 232 /** 233 * Closes the writer, which in turn closes the FileSystem. 234 * @throws Exception 235 */ 236 @Override 237 public synchronized void close() throws Exception { 238 if (this.writer != null && !this.writerClosed) { 239 this.writer.close(); 240 this.writerClosed = true; 241 } 242 } 243}