blob: a569567d1efa6a6f532ad57a446ae4ed93a403dd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.logging.proto;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
/**
* Class to create proto reader and writer for a date partitioned directory structure.
*
* @param <T> The proto message type.
*/
public class DatePartitionedLogger<T extends MessageLite> {
private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class);
// Everyone has permission to write, but with sticky set so that delete is restricted.
// This is required, since the path is same for all users and everyone writes into it.
private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
// Since the directories have broad permissions restrict the file read access.
private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066);
private final Parser<T> parser;
private final Path basePath;
private final Configuration conf;
private final Clock clock;
public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
throws IOException {
this.conf = new Configuration(conf);
this.clock = clock;
this.parser = parser;
createDirIfNotExists(baseDir);
this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
FsPermission.setUMask(this.conf, FILE_UMASK);
}
private void createDirIfNotExists(Path path) throws IOException {
FileSystem fileSystem = path.getFileSystem(conf);
try {
if (!fileSystem.exists(path)) {
fileSystem.mkdirs(path);
fileSystem.setPermission(path, DIR_PERMISSION);
}
} catch (IOException e) {
// Ignore this exception, if there is a problem it'll fail when trying to read or write.
LOG.warn("Error while trying to set permission: ", e);
}
}
/**
* Creates a writer for the given fileName, with date as today.
*/
public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
return new ProtoMessageWriter<>(conf, filePath, parser);
}
/**
* Creates a reader for the given filePath, no validation is done.
*/
public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
return new ProtoMessageReader<>(conf, filePath, parser);
}
/**
* Create a path for the given date and fileName. This can be used to create a reader.
*/
public Path getPathForDate(LocalDate date, String fileName) throws IOException {
Path path = new Path(basePath, getDirForDate(date));
createDirIfNotExists(path);
return new Path(path, fileName);
}
public Path getPathForSubdir(String dirName, String fileName) {
return new Path(new Path(basePath, dirName), fileName);
}
/**
* Extract the date from the directory name, this should be a directory created by this class.
*/
public LocalDate getDateFromDir(String dirName) {
if (!dirName.startsWith("date=")) {
throw new IllegalArgumentException("Invalid directory: "+ dirName);
}
return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
}
/**
* Returns the directory name for a given date.
*/
public String getDirForDate(LocalDate date) {
return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
}
/**
* Find next available directory, after the given directory.
*/
public String getNextDirectory(String currentDir) throws IOException {
// Fast check, if the next day directory exists return it.
String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
FileSystem fileSystem = basePath.getFileSystem(conf);
if (fileSystem.exists(new Path(basePath, nextDate))) {
return nextDate;
}
// Have to scan the directory to find min date greater than currentDir.
String dirName = null;
RemoteIterator<FileStatus> iter = fileSystem.listStatusIterator(basePath);
while (iter.hasNext()) {
FileStatus status = iter.next();
String name = status.getPath().getName();
// String comparison is good enough, since its of form date=yyyy-MM-dd
if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
dirName = name;
}
}
return dirName;
}
/**
* Returns new or changed files in the given directory. The offsets are used to find
* changed files.
*/
public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
throws IOException {
Path dirPath = new Path(basePath, subDir);
FileSystem fileSystem = basePath.getFileSystem(conf);
List<FileStatus> newFiles = new ArrayList<>();
if (!fileSystem.exists(dirPath)) {
return newFiles;
}
RemoteIterator<FileStatus> iter = fileSystem.listStatusIterator(dirPath);
while (iter.hasNext()) {
FileStatus status = iter.next();
String fileName = status.getPath().getName();
Long offset = currentOffsets.get(fileName);
// If the offset was never added or offset < fileSize.
if (offset == null || offset < status.getLen()) {
newFiles.add(status);
}
}
return newFiles;
}
/**
* Returns the current time, using the underlying clock in UTC time.
*/
public LocalDateTime getNow() {
// Use UTC date to ensure reader date is same on all timezones.
return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
}
public Configuration getConfig() {
return conf;
}
}