blob: 390a4e2184f949d9b26c89b9ce3e6c50494cb29e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hudi.timeline.service.handlers;
import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.http.Context;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
* REST Handler servicing marker requests.
* The marker creation requests are handled asynchronous, while other types of requests
* are handled synchronous.
* Marker creation requests are batch processed periodically by a thread. Each batch
* processing thread adds new markers to a marker file. Given that marker file operation
* can take time, multiple concurrent threads can run at the same, while they operate
* on different marker files storing mutually exclusive marker entries. At any given
* time, a marker file is touched by at most one thread to guarantee consistency.
* Below is an example of running batch processing threads.
* |-----| batch interval
* Worker Thread 1 |-------------------------->| writing to MARKERS0
* Worker Thread 2 |-------------------------->| writing to MARKERS1
* Worker Thread 3 |-------------------------->| writing to MARKERS2
public class MarkerHandler extends Handler {
private static final Logger LOG = LoggerFactory.getLogger(MarkerHandler.class);
private final Registry metricsRegistry;
// a scheduled executor service to schedule dispatching of marker creation requests
private final ScheduledExecutorService dispatchingExecutorService;
// an executor service to schedule the worker threads of batch processing marker creation requests
private final ExecutorService batchingExecutorService;
// Parallelism for reading and deleting marker files
private final int parallelism;
// Marker directory states, {markerDirPath -> MarkerDirState instance}
// Use ConcurrentHashMap to ensure thread safety in dispatchingExecutorService
private final Map<String, MarkerDirState> markerDirStateMap = new ConcurrentHashMap<>();
// A thread to dispatch marker creation requests to batch processing threads
private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable;
private final Object firstCreationRequestSeenLock = new Object();
private final Object earlyConflictDetectionLock = new Object();
private transient HoodieEngineContext hoodieEngineContext;
private ScheduledFuture<?> dispatchingThreadFuture;
private boolean firstCreationRequestSeen;
private String currentMarkerDir = null;
private TimelineServerBasedDetectionStrategy earlyConflictDetectionStrategy;
public MarkerHandler(Configuration conf, TimelineService.Config timelineServiceConfig,
HoodieEngineContext hoodieEngineContext, FileSystem fileSystem,
FileSystemViewManager viewManager, Registry metricsRegistry) throws IOException {
super(conf, timelineServiceConfig, fileSystem, viewManager);
LOG.debug("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
LOG.debug("MarkerHandler batching params: batchNumThreads=" + timelineServiceConfig.markerBatchNumThreads
+ " batchIntervalMs=" + timelineServiceConfig.markerBatchIntervalMs + "ms");
this.hoodieEngineContext = hoodieEngineContext;
this.metricsRegistry = metricsRegistry;
this.parallelism = timelineServiceConfig.markerParallelism;
this.dispatchingExecutorService = Executors.newSingleThreadScheduledExecutor();
this.batchingExecutorService = Executors.newFixedThreadPool(timelineServiceConfig.markerBatchNumThreads);
this.markerCreationDispatchingRunnable =
new MarkerCreationDispatchingRunnable(markerDirStateMap, batchingExecutorService);
this.firstCreationRequestSeen = false;
* Stops the dispatching of marker creation requests.
public void stop() {
if (dispatchingThreadFuture != null) {
* @param markerDir marker directory path
* @return all marker paths in the marker directory
public Set<String> getAllMarkers(String markerDir) {
MarkerDirState markerDirState = getMarkerDirState(markerDir);
return markerDirState.getAllMarkers();
* @param markerDir marker directory path.
* @return Pending markers from the requests to process.
public Set<String> getPendingMarkersToProcess(String markerDir) {
if (markerDirStateMap.containsKey(markerDir)) {
MarkerDirState markerDirState = getMarkerDirState(markerDir);
return markerDirState.getPendingMarkerCreationRequests(false).stream()
return Collections.emptySet();
* @param markerDir marker directory path
* @return all marker paths of write IO type "CREATE" and "MERGE"
public Set<String> getCreateAndMergeMarkers(String markerDir) {
return getAllMarkers(markerDir).stream()
.filter(markerName -> !markerName.endsWith(
* @param markerDir marker directory path
* @return {@code true} if the marker directory exists; {@code false} otherwise.
public boolean doesMarkerDirExist(String markerDir) {
MarkerDirState markerDirState = getMarkerDirState(markerDir);
return markerDirState.exists();
* Generates a future for an async marker creation request
* The future is added to the marker creation future list and waits for the next batch processing
* of marker creation requests.
* @param context Javalin app context
* @param markerDir marker directory path
* @param markerName marker name
* @return the {@code CompletableFuture} instance for the request
public CompletableFuture<String> createMarker(Context context, String markerDir, String markerName, String basePath) {
// Step1 do early conflict detection if enable
if (timelineServiceConfig.earlyConflictDetectionEnable) {
try {
synchronized (earlyConflictDetectionLock) {
if (earlyConflictDetectionStrategy == null) {
String strategyClassName = timelineServiceConfig.earlyConflictDetectionStrategy;
if (!ReflectionUtils.isSubClass(strategyClassName, TimelineServerBasedDetectionStrategy.class)) {
LOG.warn("Cannot use " + strategyClassName + " for timeline-server-based markers.");
strategyClassName = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy";
LOG.warn("Falling back to " + strategyClassName);
earlyConflictDetectionStrategy =
(TimelineServerBasedDetectionStrategy) ReflectionUtils.loadClass(
strategyClassName, basePath, markerDir, markerName, timelineServiceConfig.checkCommitConflict);
// markerDir => $base_path/.hoodie/.temp/$instant_time
// If markerDir is changed like move to the next instant action, we need to fresh this earlyConflictDetectionStrategy.
// For specific instant related create marker action, we only call this check/fresh once
// instead of starting the conflict detector for every request
if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
this.currentMarkerDir = markerDir;
Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
Set<HoodieInstant> completedCommits = new HashSet<>(
.filter(instant -> actions.contains(instant.getAction()))
markerDir, basePath, timelineServiceConfig.maxAllowableHeartbeatIntervalInMs,
fileSystem, this, completedCommits);
} catch (HoodieEarlyConflictDetectionException he) {
LOG.warn("Detected the write conflict due to a concurrent writer, "
+ "failing the marker creation as the early conflict detection is enabled", he);
return finishCreateMarkerFuture(context, markerDir, markerName);
} catch (Exception e) {
LOG.warn("Failed to execute early conflict detection." + e.getMessage());
// When early conflict detection fails to execute, we still allow the marker creation
// to continue
return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName);
// Step 2 create marker
return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName);
private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing(
Context context, String markerDir, String markerName) {"Request: create marker: " + markerName);
MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName);
// Add the future to the list
MarkerDirState markerDirState = getMarkerDirState(markerDir);
if (!firstCreationRequestSeen) {
synchronized (firstCreationRequestSeenLock) {
if (!firstCreationRequestSeen) {
dispatchingThreadFuture = dispatchingExecutorService.scheduleAtFixedRate(markerCreationDispatchingRunnable,
timelineServiceConfig.markerBatchIntervalMs, timelineServiceConfig.markerBatchIntervalMs,
firstCreationRequestSeen = true;
return future;
private CompletableFuture<String> finishCreateMarkerFuture(Context context, String markerDir, String markerName) {
MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName);
try {
future.getContext(), future.isSuccessful(), metricsRegistry, new ObjectMapper(), LOG));
} catch (JsonProcessingException e) {
throw new HoodieException("Failed to JSON encode the value", e);
return future;
* Deletes markers in the directory.
* @param markerDir marker directory path
* @return {@code true} if successful; {@code false} otherwise.
public Boolean deleteMarkers(String markerDir) {
boolean result = getMarkerDirState(markerDir).deleteAllMarkers();
return result;
private MarkerDirState getMarkerDirState(String markerDir) {
MarkerDirState markerDirState = markerDirStateMap.get(markerDir);
if (markerDirState == null) {
synchronized (markerDirStateMap) {
if (markerDirStateMap.get(markerDir) == null) {
Option<TimelineServerBasedDetectionStrategy> strategy =
&& earlyConflictDetectionStrategy != null
? Option.of(earlyConflictDetectionStrategy) : Option.empty();
markerDirState = new MarkerDirState(
markerDir, timelineServiceConfig.markerBatchNumThreads,
strategy, fileSystem, metricsRegistry, hoodieEngineContext, parallelism);
markerDirStateMap.put(markerDir, markerDirState);
} else {
markerDirState = markerDirStateMap.get(markerDir);
return markerDirState;