blob: 697083c30c92a25f3a396ac2b8b5f784c8371630 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.proto;
import java.io.Closeable;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class to scan all the dag manifest files to get manifest entries. This class is
* not thread safe.
*/
public class DagManifesFileScanner implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(DagManifesFileScanner.class);
private static final int SCANNER_OFFSET_VERSION = 2;
private static final int MAX_RETRY = 3;
private final ObjectMapper mapper = new ObjectMapper();
private final DatePartitionedLogger<ManifestEntryProto> manifestLogger;
private final long syncTime;
private final boolean withDoas;
private String scanDir;
private Map<String, Long> offsets;
private Map<String, Integer> retryCount;
private List<FileStatus> newFiles;
private ProtoMessageReader<ManifestEntryProto> reader;
private String currentFilePath;
public DagManifesFileScanner(DatePartitionedLogger<ManifestEntryProto> manifestLogger) {
this.manifestLogger = manifestLogger;
this.syncTime = manifestLogger.getConfig().getLong(
TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS,
TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT);
this.withDoas = manifestLogger.getConfig().getBoolean(
TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_DOAS,
TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_DOAS_DEFAULT);
this.setOffset(LocalDate.ofEpochDay(0));
}
// Update the offset version and checks below to ensure correct versions are supported.
// All public to simplify json conversion.
public static class DagManifestOffset {
public int version;
public String scanDir;
public Map<String, Long> offsets;
public Map<String, Integer> retryCount;
}
public void setOffset(String offset) {
try {
DagManifestOffset dagOffset = mapper.readValue(offset, DagManifestOffset.class);
if (dagOffset.version > SCANNER_OFFSET_VERSION) {
throw new IllegalArgumentException("Version mismatch: " + dagOffset.version);
}
this.scanDir = dagOffset.scanDir;
this.offsets = dagOffset.offsets == null ? new HashMap<>() : dagOffset.offsets;
this.retryCount = dagOffset.retryCount == null ? new HashMap<>() : dagOffset.retryCount;
this.newFiles = new ArrayList<>();
} catch (IOException e) {
throw new IllegalArgumentException("Invalid offset", e);
}
}
public void setOffset(LocalDate date) {
this.scanDir = manifestLogger.getDirForDate(date);
this.offsets = new HashMap<>();
this.retryCount = new HashMap<>();
this.newFiles = new ArrayList<>();
}
public String getOffset() {
try {
DagManifestOffset offset = new DagManifestOffset();
offset.version = SCANNER_OFFSET_VERSION;
offset.scanDir = scanDir;
offset.offsets = offsets;
offset.retryCount = retryCount;
return mapper.writeValueAsString(offset);
} catch (IOException e) {
throw new RuntimeException("Unexpected exception while converting to json.", e);
}
}
public ManifestEntryProto getNext() throws IOException {
while (true) {
if (reader != null) {
ManifestEntryProto evt = null;
try {
evt = reader.readEvent();
retryCount.remove(currentFilePath);
} catch (IOException e) {
LOG.error("Error trying to read event from file: {}", currentFilePath, e);
incrementError(currentFilePath);
}
if (evt != null) {
offsets.put(reader.getFilePath().getName(), reader.getOffset());
return evt;
} else {
IOUtils.closeQuietly(reader);
reader = null;
currentFilePath = null;
}
}
if (!newFiles.isEmpty()) {
this.reader = getNextReader();
this.currentFilePath = reader != null ? reader.getFilePath().toString() : null;
} else {
if (!loadMore()) {
return null;
}
}
}
}
private void incrementError(String path) {
int count = retryCount.getOrDefault(path, 0);
retryCount.put(path, count + 1);
}
private ProtoMessageReader<ManifestEntryProto> getNextReader() throws IOException {
FileStatus status = newFiles.remove(0);
PrivilegedAction<ProtoMessageReader<ManifestEntryProto>> action = () -> {
try {
return manifestLogger.getReader(status.getPath());
} catch (IOException e) {
String path = status.getPath().toString();
LOG.error("Error trying to open file: {}", path, e);
incrementError(path);
return null;
}
};
if (withDoas) {
UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
status.getOwner(), UserGroupInformation.getCurrentUser());
return proxyUser.doAs(action);
} else {
return action.run();
}
}
@Override
public void close() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
}
private void filterErrors(List<FileStatus> files) {
Iterator<FileStatus> iter = files.iterator();
while (iter.hasNext()) {
FileStatus status = iter.next();
String path = status.getPath().toString();
if (retryCount.getOrDefault(path, 0) > MAX_RETRY) {
LOG.warn("Removing file {}, too many errors", path);
iter.remove();
}
}
}
private void loadNewFiles(String todayDir) throws IOException {
newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets);
if (!scanDir.equals(todayDir)) {
filterErrors(newFiles);
}
}
private boolean loadMore() throws IOException {
LocalDateTime now = manifestLogger.getNow();
LocalDate today = now.toLocalDate();
String todayDir = manifestLogger.getDirForDate(today);
loadNewFiles(todayDir);
while (newFiles.isEmpty()) {
if (now.getHour() * 3600 + now.getMinute() * 60 + now.getSecond() < syncTime) {
// We are in the delay window for today, do not advance date if we are moving from
// yesterday.
if (scanDir.equals(manifestLogger.getDirForDate(today.minusDays(1)))) {
return false;
}
}
String nextDir = manifestLogger.getNextDirectory(scanDir);
if (nextDir == null) {
return false;
}
scanDir = nextDir;
offsets = new HashMap<>();
retryCount = new HashMap<>();
loadNewFiles(todayDir);
}
return true;
}
}