001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver.restart;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.yarn.api.records.ApplicationId;
025import org.apache.reef.annotations.Unstable;
026import org.apache.reef.annotations.audience.DriverSide;
027import org.apache.reef.annotations.audience.RuntimeAuthor;
028import org.apache.reef.driver.parameters.FailDriverOnEvaluatorLogErrors;
029import org.apache.reef.exception.DriverFatalRuntimeException;
030import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
031import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
032import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
033import org.apache.reef.runtime.yarn.util.YarnUtilities;
034import org.apache.reef.tang.annotations.Parameter;
035import org.apache.reef.util.CloseableIterable;
036
037import javax.inject.Inject;
038import java.io.*;
039import java.util.Arrays;
040import java.util.HashSet;
041import java.util.Set;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * An Evaluator Preserver that uses the DFS on YARN.
047 */
048@DriverSide
049@RuntimeAuthor
050@Unstable
051public final class DFSEvaluatorPreserver implements EvaluatorPreserver, AutoCloseable {
052  private static final Logger LOG = Logger.getLogger(DFSEvaluatorPreserver.class.getName());
053
054  private static final String ADD_FLAG = "+";
055
056  private static final String REMOVE_FLAG = "-";
057
058  private final boolean failDriverOnEvaluatorLogErrors;
059
060  private DFSEvaluatorLogReaderWriter readerWriter;
061
062  private Path changeLogLocation;
063
064  private FileSystem fileSystem;
065
066  private boolean writerClosed = false;
067
068  @Inject
069  DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
070                        final boolean failDriverOnEvaluatorLogErrors) {
071    this(failDriverOnEvaluatorLogErrors, "/ReefApplications/" + getEvaluatorChangeLogFolderLocation());
072  }
073
074  @Inject
075  private DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
076                                final boolean failDriverOnEvaluatorLogErrors,
077                                @Parameter(JobSubmissionDirectory.class)
078                                final String jobSubmissionDirectory) {
079
080    this.failDriverOnEvaluatorLogErrors = failDriverOnEvaluatorLogErrors;
081
082    try {
083      final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
084      this.fileSystem = FileSystem.get(config);
085      this.changeLogLocation =
086          new Path("/" + StringUtils.strip(jobSubmissionDirectory, "/") + "/evaluatorsChangesLog");
087
088      boolean appendSupported = config.getBoolean("dfs.support.append", false);
089
090      if (appendSupported) {
091        this.readerWriter = new DFSEvaluatorLogAppendReaderWriter(this.fileSystem, this.changeLogLocation);
092      } else {
093        this.readerWriter = new DFSEvaluatorLogOverwriteReaderWriter(this.fileSystem, this.changeLogLocation);
094      }
095    } catch (final IOException e) {
096      final String errMsg = "Cannot read from log file with Exception " + e +
097          ", evaluators will not be recovered.";
098      final String fatalMsg = "Driver was not able to instantiate FileSystem.";
099
100      this.handleException(e, errMsg, fatalMsg);
101      this.fileSystem = null;
102      this.changeLogLocation = null;
103      this.readerWriter = null;
104    }
105  }
106
107  /**
108   * @return the folder for Evaluator changelog.
109   */
110  private static String getEvaluatorChangeLogFolderLocation() {
111    final ApplicationId appId = YarnUtilities.getApplicationId();
112    if (appId != null) {
113      return appId.toString();
114    }
115
116    final String jobIdentifier = EvaluatorManager.getJobIdentifier();
117    if (jobIdentifier != null) {
118      return jobIdentifier;
119    }
120
121    throw new RuntimeException("Could not retrieve a suitable DFS folder for preserving Evaluator changelog.");
122  }
123
124  /**
125   * Recovers the set of evaluators that are alive.
126   */
127  @Override
128  public synchronized Set<String> recoverEvaluators() {
129    final Set<String> expectedContainers = new HashSet<>();
130    try {
131      if (this.fileSystem == null || this.changeLogLocation == null) {
132        LOG.log(Level.WARNING, "Unable to recover evaluators due to failure to instantiate FileSystem. Returning an" +
133            " empty set.");
134        return expectedContainers;
135      }
136
137      try (final CloseableIterable<String> evaluatorLogIterable = readerWriter.readFromEvaluatorLog()) {
138        for (final String line : evaluatorLogIterable) {
139          if (line.startsWith(ADD_FLAG)) {
140            final String containerId = line.substring(ADD_FLAG.length());
141            if (expectedContainers.contains(containerId)) {
142              LOG.log(Level.WARNING, "Duplicated add container record found in the change log for container " +
143                  containerId);
144            } else {
145              expectedContainers.add(containerId);
146            }
147          } else if (line.startsWith(REMOVE_FLAG)) {
148            final String containerId = line.substring(REMOVE_FLAG.length());
149            if (!expectedContainers.contains(containerId)) {
150              LOG.log(Level.WARNING, "Change log includes record that try to remove non-exist or duplicate " +
151                  "remove record for container + " + containerId);
152            }
153            expectedContainers.remove(containerId);
154          }
155        }
156      }
157    } catch (final Exception e) {
158      final String errMsg = "Cannot read from log file with Exception " + e +
159          ", evaluators will not be recovered.";
160
161      final String fatalMsg = "Cannot read from evaluator log.";
162      this.handleException(e, errMsg, fatalMsg);
163    }
164
165    return expectedContainers;
166  }
167
168  /**
169   * Adds the allocated evaluator entry to the evaluator log.
170   * @param id
171   */
172  @Override
173  public synchronized void recordAllocatedEvaluator(final String id) {
174    if (this.fileSystem != null && this.changeLogLocation != null) {
175      final String entry = ADD_FLAG + id + System.lineSeparator();
176      this.logContainerChange(entry);
177    }
178  }
179
180  /**
181   * Adds the removed evaluator entry to the evaluator log.
182   * @param id
183   */
184  @Override
185  public synchronized void recordRemovedEvaluator(final String id) {
186    if (this.fileSystem != null && this.changeLogLocation != null) {
187      final String entry = REMOVE_FLAG + id + System.lineSeparator();
188      this.logContainerChange(entry);
189    }
190  }
191
192  private void logContainerChange(final String entry) {
193    try {
194      this.readerWriter.writeToEvaluatorLog(entry);
195    } catch (final IOException e) {
196      final String errorMsg = "Unable to log the change of container [" + entry +
197          "] to the container log. Driver restart won't work properly.";
198
199      final String fatalMsg = "Unable to log container change.";
200
201      this.handleException(e, errorMsg, fatalMsg);
202    }
203  }
204
205  private void handleException(final Exception e, final String errorMsg, final String fatalMsg){
206    if (this.failDriverOnEvaluatorLogErrors) {
207      LOG.log(Level.SEVERE, errorMsg, e);
208
209      try {
210        this.close();
211      } catch (Exception e1) {
212        LOG.log(Level.SEVERE, "Failed on closing resource with " + Arrays.toString(e1.getStackTrace()));
213      }
214
215      if (fatalMsg != null) {
216        throw new DriverFatalRuntimeException(fatalMsg, e);
217      } else {
218        throw new DriverFatalRuntimeException("Driver failed on Evaluator log error.", e);
219      }
220    } else {
221      LOG.log(Level.WARNING, errorMsg, e);
222    }
223  }
224
225  /**
226   * Closes the readerWriter, which in turn closes the FileSystem.
227   * @throws Exception
228   */
229  @Override
230  public synchronized void close() throws Exception {
231    if (this.readerWriter != null && !this.writerClosed) {
232      this.readerWriter.close();
233      this.writerClosed = true;
234    }
235  }
236}