001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver.restart; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.yarn.api.records.ApplicationId; 025import org.apache.reef.annotations.Unstable; 026import org.apache.reef.annotations.audience.DriverSide; 027import org.apache.reef.annotations.audience.RuntimeAuthor; 028import org.apache.reef.driver.parameters.FailDriverOnEvaluatorLogErrors; 029import org.apache.reef.exception.DriverFatalRuntimeException; 030import org.apache.reef.runtime.common.driver.EvaluatorPreserver; 031import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; 032import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; 033import org.apache.reef.runtime.yarn.util.YarnUtilities; 034import org.apache.reef.tang.annotations.Parameter; 035import org.apache.reef.util.CloseableIterable; 036 037import javax.inject.Inject; 038import java.io.*; 039import java.util.Arrays; 040import java.util.HashSet; 041import java.util.Set; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * An Evaluator Preserver that uses the DFS on YARN. 047 */ 048@DriverSide 049@RuntimeAuthor 050@Unstable 051public final class DFSEvaluatorPreserver implements EvaluatorPreserver, AutoCloseable { 052 private static final Logger LOG = Logger.getLogger(DFSEvaluatorPreserver.class.getName()); 053 054 private static final String ADD_FLAG = "+"; 055 056 private static final String REMOVE_FLAG = "-"; 057 058 private final boolean failDriverOnEvaluatorLogErrors; 059 060 private DFSEvaluatorLogReaderWriter readerWriter; 061 062 private Path changeLogLocation; 063 064 private FileSystem fileSystem; 065 066 private boolean writerClosed = false; 067 068 @Inject 069 DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class) 070 final boolean failDriverOnEvaluatorLogErrors) { 071 this(failDriverOnEvaluatorLogErrors, "/ReefApplications/" + getEvaluatorChangeLogFolderLocation()); 072 } 073 074 @Inject 075 private DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class) 076 final boolean failDriverOnEvaluatorLogErrors, 077 @Parameter(JobSubmissionDirectory.class) 078 final String jobSubmissionDirectory) { 079 080 this.failDriverOnEvaluatorLogErrors = failDriverOnEvaluatorLogErrors; 081 082 try { 083 final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration(); 084 this.fileSystem = FileSystem.get(config); 085 this.changeLogLocation = 086 new Path("/" + StringUtils.strip(jobSubmissionDirectory, "/") + "/evaluatorsChangesLog"); 087 088 boolean appendSupported = config.getBoolean("dfs.support.append", false); 089 090 if (appendSupported) { 091 this.readerWriter = new DFSEvaluatorLogAppendReaderWriter(this.fileSystem, this.changeLogLocation); 092 } else { 093 this.readerWriter = new DFSEvaluatorLogOverwriteReaderWriter(this.fileSystem, this.changeLogLocation); 094 } 095 } catch (final IOException e) { 096 final String errMsg = "Cannot read from log file with Exception " + e + 097 ", evaluators will not be recovered."; 098 final String fatalMsg = "Driver was not able to instantiate FileSystem."; 099 100 this.handleException(e, errMsg, fatalMsg); 101 this.fileSystem = null; 102 this.changeLogLocation = null; 103 this.readerWriter = null; 104 } 105 } 106 107 /** 108 * @return the folder for Evaluator changelog. 109 */ 110 private static String getEvaluatorChangeLogFolderLocation() { 111 final ApplicationId appId = YarnUtilities.getApplicationId(); 112 if (appId != null) { 113 return appId.toString(); 114 } 115 116 final String jobIdentifier = EvaluatorManager.getJobIdentifier(); 117 if (jobIdentifier != null) { 118 return jobIdentifier; 119 } 120 121 throw new RuntimeException("Could not retrieve a suitable DFS folder for preserving Evaluator changelog."); 122 } 123 124 /** 125 * Recovers the set of evaluators that are alive. 126 */ 127 @Override 128 public synchronized Set<String> recoverEvaluators() { 129 final Set<String> expectedContainers = new HashSet<>(); 130 try { 131 if (this.fileSystem == null || this.changeLogLocation == null) { 132 LOG.log(Level.WARNING, "Unable to recover evaluators due to failure to instantiate FileSystem. Returning an" + 133 " empty set."); 134 return expectedContainers; 135 } 136 137 try (final CloseableIterable<String> evaluatorLogIterable = readerWriter.readFromEvaluatorLog()) { 138 for (final String line : evaluatorLogIterable) { 139 if (line.startsWith(ADD_FLAG)) { 140 final String containerId = line.substring(ADD_FLAG.length()); 141 if (expectedContainers.contains(containerId)) { 142 LOG.log(Level.WARNING, "Duplicated add container record found in the change log for container " + 143 containerId); 144 } else { 145 expectedContainers.add(containerId); 146 } 147 } else if (line.startsWith(REMOVE_FLAG)) { 148 final String containerId = line.substring(REMOVE_FLAG.length()); 149 if (!expectedContainers.contains(containerId)) { 150 LOG.log(Level.WARNING, "Change log includes record that try to remove non-exist or duplicate " + 151 "remove record for container + " + containerId); 152 } 153 expectedContainers.remove(containerId); 154 } 155 } 156 } 157 } catch (final Exception e) { 158 final String errMsg = "Cannot read from log file with Exception " + e + 159 ", evaluators will not be recovered."; 160 161 final String fatalMsg = "Cannot read from evaluator log."; 162 this.handleException(e, errMsg, fatalMsg); 163 } 164 165 return expectedContainers; 166 } 167 168 /** 169 * Adds the allocated evaluator entry to the evaluator log. 170 * @param id 171 */ 172 @Override 173 public synchronized void recordAllocatedEvaluator(final String id) { 174 if (this.fileSystem != null && this.changeLogLocation != null) { 175 final String entry = ADD_FLAG + id + System.lineSeparator(); 176 this.logContainerChange(entry); 177 } 178 } 179 180 /** 181 * Adds the removed evaluator entry to the evaluator log. 182 * @param id 183 */ 184 @Override 185 public synchronized void recordRemovedEvaluator(final String id) { 186 if (this.fileSystem != null && this.changeLogLocation != null) { 187 final String entry = REMOVE_FLAG + id + System.lineSeparator(); 188 this.logContainerChange(entry); 189 } 190 } 191 192 private void logContainerChange(final String entry) { 193 try { 194 this.readerWriter.writeToEvaluatorLog(entry); 195 } catch (final IOException e) { 196 final String errorMsg = "Unable to log the change of container [" + entry + 197 "] to the container log. Driver restart won't work properly."; 198 199 final String fatalMsg = "Unable to log container change."; 200 201 this.handleException(e, errorMsg, fatalMsg); 202 } 203 } 204 205 private void handleException(final Exception e, final String errorMsg, final String fatalMsg){ 206 if (this.failDriverOnEvaluatorLogErrors) { 207 LOG.log(Level.SEVERE, errorMsg, e); 208 209 try { 210 this.close(); 211 } catch (Exception e1) { 212 LOG.log(Level.SEVERE, "Failed on closing resource with " + Arrays.toString(e1.getStackTrace())); 213 } 214 215 if (fatalMsg != null) { 216 throw new DriverFatalRuntimeException(fatalMsg, e); 217 } else { 218 throw new DriverFatalRuntimeException("Driver failed on Evaluator log error.", e); 219 } 220 } else { 221 LOG.log(Level.WARNING, errorMsg, e); 222 } 223 } 224 225 /** 226 * Closes the readerWriter, which in turn closes the FileSystem. 227 * @throws Exception 228 */ 229 @Override 230 public synchronized void close() throws Exception { 231 if (this.readerWriter != null && !this.writerClosed) { 232 this.readerWriter.close(); 233 this.writerClosed = true; 234 } 235 } 236}