blob: 9a161054b4976af0c947cde66468f84506b9de24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.yarn.driver.restart;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.util.CloseableIterable;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
/**
* The DFS evaluator logger that performs regular append. dfs.support.append should be true.
*/
@Private
public final class DFSEvaluatorLogAppendReaderWriter implements DFSEvaluatorLogReaderWriter {
private final FileSystem fileSystem;
private final Path changelogPath;
private final DFSLineReader reader;
private boolean fsClosed = false;
DFSEvaluatorLogAppendReaderWriter(final FileSystem fileSystem, final Path changelogPath) {
this.fileSystem = fileSystem;
this.changelogPath = changelogPath;
this.reader = new DFSLineReader(fileSystem);
}
/**
* Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log.
* The entry is appended regularly by an FS that supports append.
* @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information).
* @throws IOException
*/
@Override
public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException {
final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
try (BufferedWriter bw = fileCreated ?
new BufferedWriter(new OutputStreamWriter(
this.fileSystem.append(this.changelogPath), StandardCharsets.UTF_8)) :
new BufferedWriter(new OutputStreamWriter(
this.fileSystem.create(this.changelogPath), StandardCharsets.UTF_8))
) {
bw.write(formattedEntry);
}
}
@Override
public CloseableIterable<String> readFromEvaluatorLog() throws IOException {
return reader.readLinesFromFile(changelogPath);
}
/**
* Closes the FileSystem.
* @throws Exception
*/
@Override
public synchronized void close() throws Exception {
if (this.fileSystem != null && !this.fsClosed) {
this.fileSystem.close();
this.fsClosed = true;
}
}
}