blob: 171357f53412a8048a520e171883304dfb05750a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.timeline.service;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import io.javalin.Javalin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
/**
* A standalone timeline service exposing File-System View interfaces to clients.
*/
public class TimelineService {
private static final Logger LOG = LoggerFactory.getLogger(TimelineService.class);
private static final int START_SERVICE_MAX_RETRIES = 16;
private static final int DEFAULT_NUM_THREADS = 250;
private int serverPort;
private Config timelineServerConf;
private Configuration conf;
private transient HoodieEngineContext context;
private transient FileSystem fs;
private transient Javalin app = null;
private transient FileSystemViewManager fsViewsManager;
private transient RequestHandler requestHandler;
public int getServerPort() {
return serverPort;
}
public TimelineService(HoodieEngineContext context, Configuration hadoopConf, Config timelineServerConf,
FileSystem fileSystem, FileSystemViewManager globalFileSystemViewManager) throws IOException {
this.conf = FSUtils.prepareHadoopConf(hadoopConf);
this.timelineServerConf = timelineServerConf;
this.serverPort = timelineServerConf.serverPort;
this.context = context;
this.fs = fileSystem;
this.fsViewsManager = globalFileSystemViewManager;
}
/**
* Config for {@code TimelineService} class.
*/
public static class Config implements Serializable {
@Parameter(names = {"--server-port", "-p"}, description = " Server Port")
public Integer serverPort = 26754;
@Parameter(names = {"--view-storage", "-st"}, description = "View Storage Type. Default - SPILLABLE_DISK")
public FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
@Parameter(names = {"--max-view-mem-per-table", "-mv"},
description = "Maximum view memory per table in MB to be used for storing file-groups."
+ " Overflow file-groups will be spilled to disk. Used for SPILLABLE_DISK storage type")
public Integer maxViewMemPerTableInMB = 2048;
@Parameter(names = {"--mem-overhead-fraction-pending-compaction", "-cf"},
description = "Memory Fraction of --max-view-mem-per-table to be allocated for managing pending compaction"
+ " storage. Overflow entries will be spilled to disk. Used for SPILLABLE_DISK storage type")
public Double memFractionForCompactionPerTable = 0.001;
@Parameter(names = {"--base-store-path", "-sp"},
description = "Directory where spilled view entries will be stored. Used for SPILLABLE_DISK storage type")
public String baseStorePathForFileGroups = FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue();
@Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB")
public String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH.defaultValue();
@Parameter(names = {"--threads", "-t"}, description = "Number of threads to use for serving requests. The default number is 250")
public int numThreads = DEFAULT_NUM_THREADS;
@Parameter(names = {"--async"}, description = "Use asynchronous request processing")
public boolean async = false;
@Parameter(names = {"--compress"}, description = "Compress output using gzip")
public boolean compress = true;
@Parameter(names = {"--enable-marker-requests", "-em"}, description = "Enable handling of marker-related requests")
public boolean enableMarkerRequests = false;
@Parameter(names = {"--marker-batch-threads", "-mbt"}, description = "Number of threads to use for batch processing marker creation requests")
public int markerBatchNumThreads = 20;
@Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description = "The interval in milliseconds between two batch processing of marker creation requests")
public long markerBatchIntervalMs = 50;
@Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
public int markerParallelism = 100;
@Parameter(names = {"--early-conflict-detection-strategy"}, description =
"The class name of the early conflict detection strategy to use. "
+ "This should be subclass of "
+ "`org.apache.hudi.common.conflict.detection.EarlyConflictDetectionStrategy`")
public String earlyConflictDetectionStrategy = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy";
@Parameter(names = {"--early-conflict-detection-check-commit-conflict"}, description =
"Whether to enable commit conflict checking or not during early "
+ "conflict detection.")
public Boolean checkCommitConflict = false;
@Parameter(names = {"--early-conflict-detection-enable"}, description =
"Whether to enable early conflict detection based on markers. "
+ "It eagerly detects writing conflict before create markers and fails fast if a "
+ "conflict is detected, to release cluster compute resources as soon as possible.")
public Boolean earlyConflictDetectionEnable = false;
@Parameter(names = {"--async-conflict-detector-initial-delay-ms"}, description =
"Used for timeline-server-based markers with "
+ "`AsyncTimelineServerBasedDetectionStrategy`. "
+ "The time in milliseconds to delay the first execution of async marker-based conflict detection.")
public Long asyncConflictDetectorInitialDelayMs = 0L;
@Parameter(names = {"--async-conflict-detector-period-ms"}, description =
"Used for timeline-server-based markers with "
+ "`AsyncTimelineServerBasedDetectionStrategy`. "
+ "The period in milliseconds between successive executions of async marker-based conflict detection.")
public Long asyncConflictDetectorPeriodMs = 30000L;
@Parameter(names = {"--early-conflict-detection-max-heartbeat-interval-ms"}, description =
"Used for timeline-server-based markers with "
+ "`AsyncTimelineServerBasedDetectionStrategy`. "
+ "Instants whose heartbeat is greater than the current value will not be used in early conflict detection.")
public Long maxAllowableHeartbeatIntervalInMs = 120000L;
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
public static Builder builder() {
return new Builder();
}
/**
* Builder of Config class.
*/
public static class Builder {
private Integer serverPort = 26754;
private FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
private Integer maxViewMemPerTableInMB = 2048;
private Double memFractionForCompactionPerTable = 0.001;
private String baseStorePathForFileGroups = FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue();
private String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH.defaultValue();
private int numThreads = DEFAULT_NUM_THREADS;
private boolean async = false;
private boolean compress = true;
private boolean enableMarkerRequests = false;
private int markerBatchNumThreads = 20;
private long markerBatchIntervalMs = 50L;
private int markerParallelism = 100;
private String earlyConflictDetectionStrategy = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy";
private Boolean checkCommitConflict = false;
private Boolean earlyConflictDetectionEnable = false;
private Long asyncConflictDetectorInitialDelayMs = 0L;
private Long asyncConflictDetectorPeriodMs = 30000L;
private Long maxAllowableHeartbeatIntervalInMs = 120000L;
public Builder() {
}
public Builder serverPort(int serverPort) {
this.serverPort = serverPort;
return this;
}
public Builder viewStorageType(FileSystemViewStorageType viewStorageType) {
this.viewStorageType = viewStorageType;
return this;
}
public Builder maxViewMemPerTableInMB(int maxViewMemPerTableInMB) {
this.maxViewMemPerTableInMB = maxViewMemPerTableInMB;
return this;
}
public Builder memFractionForCompactionPerTable(double memFractionForCompactionPerTable) {
this.memFractionForCompactionPerTable = memFractionForCompactionPerTable;
return this;
}
public Builder baseStorePathForFileGroups(String baseStorePathForFileGroups) {
this.baseStorePathForFileGroups = baseStorePathForFileGroups;
return this;
}
public Builder rocksDBPath(String rocksDBPath) {
this.rocksDBPath = rocksDBPath;
return this;
}
public Builder numThreads(int numThreads) {
this.numThreads = numThreads;
return this;
}
public Builder async(boolean async) {
this.async = async;
return this;
}
public Builder compress(boolean compress) {
this.compress = compress;
return this;
}
public Builder enableMarkerRequests(boolean enableMarkerRequests) {
this.enableMarkerRequests = enableMarkerRequests;
return this;
}
public Builder markerBatchNumThreads(int markerBatchNumThreads) {
this.markerBatchNumThreads = markerBatchNumThreads;
return this;
}
public Builder markerBatchIntervalMs(long markerBatchIntervalMs) {
this.markerBatchIntervalMs = markerBatchIntervalMs;
return this;
}
public Builder markerParallelism(int markerParallelism) {
this.markerParallelism = markerParallelism;
return this;
}
public Builder earlyConflictDetectionStrategy(String earlyConflictDetectionStrategy) {
this.earlyConflictDetectionStrategy = earlyConflictDetectionStrategy;
return this;
}
public Builder earlyConflictDetectionCheckCommitConflict(Boolean checkCommitConflict) {
this.checkCommitConflict = checkCommitConflict;
return this;
}
public Builder earlyConflictDetectionEnable(Boolean earlyConflictDetectionEnable) {
this.earlyConflictDetectionEnable = earlyConflictDetectionEnable;
return this;
}
public Builder asyncConflictDetectorInitialDelayMs(Long asyncConflictDetectorInitialDelayMs) {
this.asyncConflictDetectorInitialDelayMs = asyncConflictDetectorInitialDelayMs;
return this;
}
public Builder asyncConflictDetectorPeriodMs(Long asyncConflictDetectorPeriodMs) {
this.asyncConflictDetectorPeriodMs = asyncConflictDetectorPeriodMs;
return this;
}
public Builder earlyConflictDetectionMaxAllowableHeartbeatIntervalInMs(Long maxAllowableHeartbeatIntervalInMs) {
this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
return this;
}
public Config build() {
Config config = new Config();
config.serverPort = this.serverPort;
config.viewStorageType = this.viewStorageType;
config.maxViewMemPerTableInMB = this.maxViewMemPerTableInMB;
config.memFractionForCompactionPerTable = this.memFractionForCompactionPerTable;
config.baseStorePathForFileGroups = this.baseStorePathForFileGroups;
config.rocksDBPath = this.rocksDBPath;
config.numThreads = this.numThreads;
config.async = this.async;
config.compress = this.compress;
config.enableMarkerRequests = this.enableMarkerRequests;
config.markerBatchNumThreads = this.markerBatchNumThreads;
config.markerBatchIntervalMs = this.markerBatchIntervalMs;
config.markerParallelism = this.markerParallelism;
config.earlyConflictDetectionStrategy = this.earlyConflictDetectionStrategy;
config.checkCommitConflict = this.checkCommitConflict;
config.earlyConflictDetectionEnable = this.earlyConflictDetectionEnable;
config.asyncConflictDetectorInitialDelayMs = this.asyncConflictDetectorInitialDelayMs;
config.asyncConflictDetectorPeriodMs = this.asyncConflictDetectorPeriodMs;
config.maxAllowableHeartbeatIntervalInMs = this.maxAllowableHeartbeatIntervalInMs;
return config;
}
}
}
private int startServiceOnPort(int port) throws IOException {
if (!(port == 0 || (1024 <= port && port < 65536))) {
throw new IllegalArgumentException(String.format("startPort should be between 1024 and 65535 (inclusive), "
+ "or 0 for a random free port. but now is %s.", port));
}
for (int attempt = 0; attempt < START_SERVICE_MAX_RETRIES; attempt++) {
// Returns port to try when trying to bind a service. Handles wrapping and skipping privileged ports.
int tryPort = port == 0 ? port : (port + attempt - 1024) % (65536 - 1024) + 1024;
try {
app.start(tryPort);
return app.port();
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("Failed to bind to")) {
if (tryPort == 0) {
LOG.warn("Timeline server could not bind on a random free port.");
} else {
LOG.warn(String.format("Timeline server could not bind on port %d. "
+ "Attempting port %d + 1.",tryPort, tryPort));
}
} else {
LOG.warn(String.format("Timeline server start failed on port %d. Attempting port %d + 1.",tryPort, tryPort), e);
}
}
}
throw new IOException(String.format("Timeline server start failed on port %d, after retry %d times", port, START_SERVICE_MAX_RETRIES));
}
public int startService() throws IOException {
int maxThreads = timelineServerConf.numThreads > 0 ? timelineServerConf.numThreads : DEFAULT_NUM_THREADS;
QueuedThreadPool pool = new QueuedThreadPool(maxThreads, 8, 60_000);
pool.setDaemon(true);
final Server server = new Server(pool);
ScheduledExecutorScheduler scheduler = new ScheduledExecutorScheduler("TimelineService-JettyScheduler", true, 8);
server.addBean(scheduler);
app = Javalin.create(c -> {
if (!timelineServerConf.compress) {
c.compressionStrategy(io.javalin.core.compression.CompressionStrategy.NONE);
}
c.server(() -> server);
});
requestHandler = new RequestHandler(
app, conf, timelineServerConf, context, fs, fsViewsManager);
app.get("/", ctx -> ctx.result("Hello Hudi"));
requestHandler.register();
int realServerPort = startServiceOnPort(serverPort);
LOG.info("Starting Timeline server on port :" + realServerPort);
this.serverPort = realServerPort;
return realServerPort;
}
public void run() throws IOException {
startService();
}
public static FileSystemViewManager buildFileSystemViewManager(Config config, SerializableConfiguration conf) {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf.get());
// Just use defaults for now
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();
switch (config.viewStorageType) {
case MEMORY:
FileSystemViewStorageConfig.Builder inMemConfBuilder = FileSystemViewStorageConfig.newBuilder();
inMemConfBuilder.withStorageType(FileSystemViewStorageType.MEMORY);
return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build(), commonConfig);
case SPILLABLE_DISK: {
FileSystemViewStorageConfig.Builder spillableConfBuilder = FileSystemViewStorageConfig.newBuilder();
spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
.withBaseStoreDir(config.baseStorePathForFileGroups)
.withMaxMemoryForView(config.maxViewMemPerTableInMB * 1024 * 1024L)
.withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build(), commonConfig);
}
case EMBEDDED_KV_STORE: {
FileSystemViewStorageConfig.Builder rocksDBConfBuilder = FileSystemViewStorageConfig.newBuilder();
rocksDBConfBuilder.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
.withRocksDBPath(config.rocksDBPath);
return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build(), commonConfig);
}
default:
throw new IllegalArgumentException("Invalid view manager storage type :" + config.viewStorageType);
}
}
public void close() {
LOG.info("Closing Timeline Service");
if (requestHandler != null) {
this.requestHandler.stop();
}
if (this.app != null) {
this.app.stop();
this.app = null;
}
this.fsViewsManager.close();
LOG.info("Closed Timeline Service");
}
public Configuration getConf() {
return conf;
}
public FileSystem getFs() {
return fs;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help) {
cmd.usage();
System.exit(1);
}
Configuration conf = FSUtils.prepareHadoopConf(new Configuration());
FileSystemViewManager viewManager = buildFileSystemViewManager(cfg, new SerializableConfiguration(conf));
TimelineService service = new TimelineService(
new HoodieLocalEngineContext(FSUtils.prepareHadoopConf(new Configuration())),
new Configuration(), cfg, FileSystem.get(new Configuration()), viewManager);
service.run();
}
}