This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver.restart;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.yarn.api.records.ApplicationId;
025import org.apache.reef.annotations.Unstable;
026import org.apache.reef.annotations.audience.DriverSide;
027import org.apache.reef.annotations.audience.RuntimeAuthor;
028import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
029import org.apache.reef.driver.parameters.FailDriverOnEvaluatorLogErrors;
030import org.apache.reef.exception.DriverFatalRuntimeException;
031import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
032import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
033import org.apache.reef.runtime.yarn.util.YarnUtilities;
034import org.apache.reef.tang.annotations.Parameter;
035
036import javax.inject.Inject;
037import java.io.*;
038import java.nio.charset.StandardCharsets;
039import java.util.Arrays;
040import java.util.HashSet;
041import java.util.Set;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * An Evaluator Preserver that uses the DFS on YARN.
047 */
048@DriverSide
049@RuntimeAuthor
050@Unstable
051public final class DFSEvaluatorPreserver implements EvaluatorPreserver, AutoCloseable {
052  private static final Logger LOG = Logger.getLogger(DFSEvaluatorPreserver.class.getName());
053
054  private static final String ADD_FLAG = "+";
055
056  private static final String REMOVE_FLAG = "-";
057
058  private final boolean failDriverOnEvaluatorLogErrors;
059
060  private DFSEvaluatorLogWriter writer;
061
062  private Path changeLogLocation;
063
064  private FileSystem fileSystem;
065
066  private boolean writerClosed = false;
067
068  @Inject
069  DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
070                        final boolean failDriverOnEvaluatorLogErrors) {
071    this(failDriverOnEvaluatorLogErrors, "/ReefApplications/" + getEvaluatorChangeLogFolderLocation());
072  }
073
074  @Inject
075  private DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
076                                final boolean failDriverOnEvaluatorLogErrors,
077                                @Parameter(DriverJobSubmissionDirectory.class)
078                                final String jobSubmissionDirectory) {
079
080    this.failDriverOnEvaluatorLogErrors = failDriverOnEvaluatorLogErrors;
081
082    try {
083      final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
084      this.fileSystem = FileSystem.get(config);
085      this.changeLogLocation =
086          new Path(StringUtils.stripEnd(jobSubmissionDirectory, "/") + "/evaluatorsChangesLog");
087
088      boolean appendSupported = config.getBoolean("dfs.support.append", false);
089
090      if (appendSupported) {
091        this.writer = new DFSEvaluatorLogAppendWriter(this.fileSystem, this.changeLogLocation);
092      } else {
093        this.writer = new DFSEvaluatorLogOverwriteWriter(this.fileSystem, this.changeLogLocation);
094      }
095    } catch (final IOException e) {
096      final String errMsg = "Cannot read from log file with Exception " + e +
097          ", evaluators will not be recovered.";
098      final String fatalMsg = "Driver was not able to instantiate FileSystem.";
099
100      this.handleException(e, errMsg, fatalMsg);
101      this.fileSystem = null;
102      this.changeLogLocation = null;
103      this.writer = null;
104    }
105  }
106
107  /**
108   * @return the folder for Evaluator changelog.
109   */
110  private static String getEvaluatorChangeLogFolderLocation() {
111    final ApplicationId appId = YarnUtilities.getApplicationId();
112    if (appId != null) {
113      return appId.toString();
114    }
115
116    final String jobIdentifier = EvaluatorManager.getJobIdentifier();
117    if (jobIdentifier != null) {
118      return jobIdentifier;
119    }
120
121    throw new RuntimeException("Could not retrieve a suitable DFS folder for preserving Evaluator changelog.");
122  }
123
124  /**
125   * Recovers the set of evaluators that are alive.
126   * @return
127   */
128  @Override
129  public synchronized Set<String> recoverEvaluators() {
130    final Set<String> expectedContainers = new HashSet<>();
131    try {
132      if (this.fileSystem == null || this.changeLogLocation == null) {
133        LOG.log(Level.WARNING, "Unable to recover evaluators due to failure to instantiate FileSystem. Returning an" +
134            " empty set.");
135        return expectedContainers;
136      }
137
138      if (!this.fileSystem.exists(this.changeLogLocation)) {
139        // empty set
140        return expectedContainers;
141      } else {
142        final BufferedReader br = new BufferedReader(
143            new InputStreamReader(this.fileSystem.open(this.changeLogLocation), StandardCharsets.UTF_8));
144        String line = br.readLine();
145        while (line != null) {
146          if (line.startsWith(ADD_FLAG)) {
147            final String containerId = line.substring(ADD_FLAG.length());
148            if (expectedContainers.contains(containerId)) {
149              LOG.log(Level.WARNING, "Duplicated add container record found in the change log for container " +
150                  containerId);
151            } else {
152              expectedContainers.add(containerId);
153            }
154          } else if (line.startsWith(REMOVE_FLAG)) {
155            final String containerId = line.substring(REMOVE_FLAG.length());
156            if (!expectedContainers.contains(containerId)) {
157              LOG.log(Level.WARNING, "Change log includes record that try to remove non-exist or duplicate " +
158                  "remove record for container + " + containerId);
159            }
160            expectedContainers.remove(containerId);
161          }
162          line = br.readLine();
163        }
164        br.close();
165      }
166    } catch (final IOException e) {
167      final String errMsg = "Cannot read from log file with Exception " + e +
168          ", evaluators will not be recovered.";
169
170      final String fatalMsg = "Cannot read from evaluator log.";
171
172      this.handleException(e, errMsg, fatalMsg);
173    }
174    return expectedContainers;
175  }
176
177  /**
178   * Adds the allocated evaluator entry to the evaluator log.
179   * @param id
180   */
181  @Override
182  public synchronized void recordAllocatedEvaluator(final String id) {
183    if (this.fileSystem != null && this.changeLogLocation != null) {
184      final String entry = ADD_FLAG + id + System.lineSeparator();
185      this.logContainerChange(entry);
186    }
187  }
188
189  /**
190   * Adds the removed evaluator entry to the evaluator log.
191   * @param id
192   */
193  @Override
194  public synchronized void recordRemovedEvaluator(final String id) {
195    if (this.fileSystem != null && this.changeLogLocation != null) {
196      final String entry = REMOVE_FLAG + id + System.lineSeparator();
197      this.logContainerChange(entry);
198    }
199  }
200
201  private void logContainerChange(final String entry) {
202    try {
203      this.writer.writeToEvaluatorLog(entry);
204    } catch (final IOException e) {
205      final String errorMsg = "Unable to log the change of container [" + entry +
206          "] to the container log. Driver restart won't work properly.";
207
208      final String fatalMsg = "Unable to log container change.";
209
210      this.handleException(e, errorMsg, fatalMsg);
211    }
212  }
213
214  private void handleException(final Exception e, final String errorMsg, final String fatalMsg){
215    if (this.failDriverOnEvaluatorLogErrors) {
216      LOG.log(Level.SEVERE, errorMsg, e);
217
218      try {
219        this.close();
220      } catch (Exception e1) {
221        LOG.log(Level.SEVERE, "Failed on closing resource with " + Arrays.toString(e1.getStackTrace()));
222      }
223
224      if (fatalMsg != null) {
225        throw new DriverFatalRuntimeException(fatalMsg, e);
226      } else {
227        throw new DriverFatalRuntimeException("Driver failed on Evaluator log error.", e);
228      }
229    }
230  }
231
232  /**
233   * Closes the writer, which in turn closes the FileSystem.
234   * @throws Exception
235   */
236  @Override
237  public synchronized void close() throws Exception {
238    if (this.writer != null && !this.writerClosed) {
239      this.writer.close();
240      this.writerClosed = true;
241    }
242  }
243}