blob: 6958b7930bded22ec2b04bbb4ea6913df28152c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* File Format for Hoodie Log Files. The File Format consists of blocks each separated with a MAGIC sync marker. A Block
* can either be a Data block, Command block or Delete Block. Data Block - Contains log records serialized as Avro
* Binary Format Command Block - Specific commands like ROLLBACK_PREVIOUS-BLOCK - Tombstone for the previously written
* block Delete Block - List of keys to delete - tombstone for keys
*/
public interface HoodieLogFormat {
/**
* Magic 6 bytes we put at the start of every block in the log file.
*/
byte[] MAGIC = new byte[] {'#', 'H', 'U', 'D', 'I', '#'};
/**
* The current version of the log format. Anytime the log format changes this version needs to be bumped and
* corresponding changes need to be made to {@link HoodieLogFormatVersion}
*/
int CURRENT_VERSION = 1;
String UNKNOWN_WRITE_TOKEN = "1-0-1";
String DEFAULT_WRITE_TOKEN = "0-0-0";
/**
* Writer interface to allow appending block to this file format.
*/
interface Writer extends Closeable {
/**
* @return the path to the current {@link HoodieLogFile} being written to.
*/
HoodieLogFile getLogFile();
/**
* Append Block to a log file.
* @return {@link AppendResult} containing result of the append.
*/
AppendResult appendBlock(HoodieLogBlock block) throws IOException, InterruptedException;
/**
* Appends the list of blocks to a logfile.
* @return {@link AppendResult} containing result of the append.
*/
AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws IOException, InterruptedException;
long getCurrentSize() throws IOException;
}
/**
* Reader interface which is an Iterator of HoodieLogBlock.
*/
interface Reader extends Closeable, Iterator<HoodieLogBlock> {
/**
* @return the path to this {@link HoodieLogFormat}
*/
HoodieLogFile getLogFile();
/**
* Read log file in reverse order and check if prev block is present.
*
* @return
*/
boolean hasPrev();
/**
* Read log file in reverse order and return prev block if present.
*
* @return
* @throws IOException
*/
HoodieLogBlock prev() throws IOException;
}
/**
* Builder class to construct the default log format writer.
*/
class WriterBuilder {
private static final Logger LOG = LogManager.getLogger(WriterBuilder.class);
// Default max log file size 512 MB
public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
// Buffer size
private Integer bufferSize;
// Replication for the log file
private Short replication;
// FileSystem
private FileSystem fs;
// Size threshold for the log file. Useful when used with a rolling log appender
private Long sizeThreshold;
// Log File extension. Could be .avro.delta or .avro.commits etc
private String fileExtension;
// File Id
private String logFileId;
// File Commit Time stamp
private String instantTime;
// version number for this log file. If not specified, then the current version will be
// computed by inspecting the file system
private Integer logVersion;
// file len of this log file
private Long fileLen = 0L;
// Location of the directory containing the log
private Path parentPath;
// Log File Write Token
private String logWriteToken;
// optional file suffix
private String suffix;
// Rollover Log file write token
private String rolloverLogWriteToken;
public WriterBuilder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public WriterBuilder withReplication(short replication) {
this.replication = replication;
return this;
}
public WriterBuilder withRolloverLogWriteToken(String rolloverLogWriteToken) {
this.rolloverLogWriteToken = rolloverLogWriteToken;
return this;
}
public WriterBuilder withLogWriteToken(String logWriteToken) {
this.logWriteToken = logWriteToken;
return this;
}
public WriterBuilder withSuffix(String suffix) {
this.suffix = suffix;
return this;
}
public WriterBuilder withFs(FileSystem fs) {
this.fs = fs;
return this;
}
public WriterBuilder withSizeThreshold(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
return this;
}
public WriterBuilder withFileExtension(String logFileExtension) {
this.fileExtension = logFileExtension;
return this;
}
public WriterBuilder withFileId(String fileId) {
this.logFileId = fileId;
return this;
}
public WriterBuilder overBaseCommit(String baseCommit) {
this.instantTime = baseCommit;
return this;
}
public WriterBuilder withLogVersion(int version) {
this.logVersion = version;
return this;
}
public WriterBuilder withFileSize(long fileLen) {
this.fileLen = fileLen;
return this;
}
public WriterBuilder onParentPath(Path parentPath) {
this.parentPath = parentPath;
return this;
}
public Writer build() throws IOException {
LOG.info("Building HoodieLogFormat Writer");
if (fs == null) {
throw new IllegalArgumentException("fs is not specified");
}
if (logFileId == null) {
throw new IllegalArgumentException("FileID is not specified");
}
if (instantTime == null) {
throw new IllegalArgumentException("BaseCommitTime is not specified");
}
if (fileExtension == null) {
throw new IllegalArgumentException("File extension is not specified");
}
if (parentPath == null) {
throw new IllegalArgumentException("Log file parent location is not specified");
}
if (rolloverLogWriteToken == null) {
rolloverLogWriteToken = UNKNOWN_WRITE_TOKEN;
}
if (logVersion == null) {
LOG.info("Computing the next log version for " + logFileId + " in " + parentPath);
Option<Pair<Integer, String>> versionAndWriteToken =
FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, instantTime);
if (versionAndWriteToken.isPresent()) {
logVersion = versionAndWriteToken.get().getKey();
logWriteToken = versionAndWriteToken.get().getValue();
} else {
logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
// this is the case where there is no existing log-file.
// Use rollover write token as write token to create new log file with tokens
logWriteToken = rolloverLogWriteToken;
}
LOG.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion
+ " with write-token " + logWriteToken);
}
if (logWriteToken == null) {
// This is the case where we have existing log-file with old format. rollover to avoid any conflicts
logVersion += 1;
fileLen = 0L;
logWriteToken = rolloverLogWriteToken;
}
if (suffix != null) {
// A little hacky to simplify the file name concatenation:
// patch the write token with an optional suffix
// instead of adding a new extension
logWriteToken = logWriteToken + suffix;
rolloverLogWriteToken = rolloverLogWriteToken + suffix;
}
Path logPath = new Path(parentPath,
FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken));
LOG.info("HoodieLogFile on path " + logPath);
HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen);
if (bufferSize == null) {
bufferSize = FSUtils.getDefaultBufferSize(fs);
}
if (replication == null) {
replication = FSUtils.getDefaultReplication(fs, parentPath);
}
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}
return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken);
}
}
static WriterBuilder newWriterBuilder() {
return new WriterBuilder();
}
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
throws IOException {
return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
}
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,
boolean readBlockLazily, boolean reverseReader) throws IOException {
return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, readBlockLazily,
reverseReader);
}
/**
* A set of feature flags associated with a log format. Versions are changed when the log format changes. TODO(na) -
* Implement policies around major/minor versions
*/
abstract class LogFormatVersion {
private final int version;
LogFormatVersion(int version) {
this.version = version;
}
public int getVersion() {
return version;
}
public abstract boolean hasMagicHeader();
public abstract boolean hasContent();
public abstract boolean hasContentLength();
public abstract boolean hasOrdinal();
public abstract boolean hasHeader();
public abstract boolean hasFooter();
public abstract boolean hasLogBlockLength();
}
}