blob: 32dd0427255f0413b1ab8e632887ea0084f71789 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.async;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
*/
public abstract class HoodieAsyncService implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieAsyncService.class);
// Flag to track if the service is started.
private boolean started;
// Flag indicating shutdown is externally requested
private boolean shutdownRequested;
// Flag indicating the service is shutdown
private volatile boolean shutdown;
// Executor Service for running delta-sync/compaction
private transient ExecutorService executor;
// Future tracking delta-sync/compaction
private transient CompletableFuture future;
// Run in daemon mode
private final boolean runInDaemonMode;
protected HoodieAsyncService() {
this(false);
}
protected HoodieAsyncService(boolean runInDaemonMode) {
shutdownRequested = false;
this.runInDaemonMode = runInDaemonMode;
}
protected boolean isShutdownRequested() {
return shutdownRequested;
}
protected boolean isShutdown() {
return shutdown;
}
/**
* Wait till the service shutdown. If the service shutdown with exception, it will be thrown
*
* @throws ExecutionException
* @throws InterruptedException
*/
public void waitForShutdown() throws ExecutionException, InterruptedException {
try {
future.get();
} catch (ExecutionException ex) {
LOG.error("Service shutdown with error", ex);
throw ex;
}
}
/**
* Request shutdown either forcefully or gracefully. Graceful shutdown allows the service to finish up the current
* round of work and shutdown. For graceful shutdown, it waits till the service is shutdown
*
* @param force Forcefully shutdown
*/
public void shutdown(boolean force) {
if (!shutdownRequested || force) {
shutdownRequested = true;
if (executor != null) {
if (force) {
executor.shutdownNow();
} else {
executor.shutdown();
try {
// Wait for some max time after requesting shutdown
executor.awaitTermination(24, TimeUnit.HOURS);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for shutdown", ie);
}
}
}
}
}
/**
* Start the service. Runs the service in a different thread and returns. Also starts a monitor thread to
* run-callbacks in case of shutdown
*
* @param onShutdownCallback
*/
public void start(Function<Boolean, Boolean> onShutdownCallback) {
Pair<CompletableFuture, ExecutorService> res = startService();
future = res.getKey();
executor = res.getValue();
started = true;
monitorThreads(onShutdownCallback);
}
/**
* Service implementation.
*
* @return
*/
protected abstract Pair<CompletableFuture, ExecutorService> startService();
/**
* A monitor thread is started which would trigger a callback if the service is shutdown.
*
* @param onShutdownCallback
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
LOG.info("Submitting monitor thread !!");
Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "Monitor Thread");
t.setDaemon(isRunInDaemonMode());
return t;
}).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
shutdown(false);
}
});
}
public boolean isRunInDaemonMode() {
return runInDaemonMode;
}
}