blob: 791de9b55da67da76e341a25133090c43d0b7ef7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
@ManageLifecycle
public class SegmentLoadDropHandler implements DataSegmentChangeHandler
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class);
// Synchronizes removals from segmentsToDelete
private final Object segmentDeleteLock = new Object();
// Synchronizes start/stop of this object.
private final Object startStopLock = new Object();
private final ObjectMapper jsonMapper;
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ServerTypeConfig serverTypeConfig;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private final SegmentCacheManager segmentCacheManager;
private volatile boolean started = false;
// Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up
// again and to return status of a completed request. Maximum size of this cache must be significantly greater
// than number of pending load/drop requests. so that history is not lost too quickly.
private final Cache<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> requestStatuses;
private final Object requestStatusesLock = new Object();
// This is the list of unresolved futures returned to callers of processBatch(List<DataSegmentChangeRequest>)
// Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes.
private final LinkedHashSet<CustomSettableFuture> waitingFutures = new LinkedHashSet<>();
@Inject
public SegmentLoadDropHandler(
ObjectMapper jsonMapper,
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
SegmentCacheManager segmentCacheManager,
ServerTypeConfig serverTypeConfig
)
{
this(
jsonMapper,
config,
announcer,
serverAnnouncer,
segmentManager,
segmentCacheManager,
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
),
serverTypeConfig
);
}
@VisibleForTesting
SegmentLoadDropHandler(
ObjectMapper jsonMapper,
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
SegmentCacheManager segmentCacheManager,
ScheduledExecutorService exec,
ServerTypeConfig serverTypeConfig
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.segmentCacheManager = segmentCacheManager;
this.exec = exec;
this.serverTypeConfig = serverTypeConfig;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
@LifecycleStart
public void start() throws IOException
{
synchronized (startStopLock) {
if (started) {
return;
}
log.info("Starting...");
try {
if (!config.getLocations().isEmpty()) {
loadLocalCache();
}
if (shouldAnnounce()) {
serverAnnouncer.announce();
}
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw new RuntimeException(e);
}
started = true;
log.info("Started.");
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopLock) {
if (!started) {
return;
}
log.info("Stopping...");
try {
if (shouldAnnounce()) {
serverAnnouncer.unannounce();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
started = false;
}
log.info("Stopped.");
}
}
public boolean isStarted()
{
return started;
}
private void loadLocalCache() throws IOException
{
final long start = System.currentTimeMillis();
File baseDir = config.getInfoDir();
FileUtils.mkdirp(baseDir);
List<DataSegment> cachedSegments = new ArrayList<>();
File[] segmentsToLoad = baseDir.listFiles();
int ignored = 0;
for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i + 1, segmentsToLoad.length, file);
try {
final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (!segment.getId().toString().equals(file.getName())) {
log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getId());
ignored++;
} else if (segmentCacheManager.isSegmentCached(segment)) {
cachedSegments.add(segment);
} else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getId());
File segmentInfoCacheFile = new File(baseDir, segment.getId().toString());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to load segment from segmentInfo file")
.addData("file", file)
.emit();
}
}
if (ignored > 0) {
log.makeAlert("Ignored misnamed segment cache files on startup.")
.addData("numIgnored", ignored)
.emit();
}
addSegments(
cachedSegments,
() -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start)
);
}
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy)
throws SegmentLoadingException
{
loadSegment(segment, callback, lazy, null);
}
/**
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
* throw a SegmentLoadingException
*
* @throws SegmentLoadingException if it fails to load the given segment
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy, @Nullable
ExecutorService loadSegmentIntoPageCacheExec)
throws SegmentLoadingException
{
final boolean loaded;
try {
loaded = segmentManager.loadSegment(segment,
lazy,
() -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false),
loadSegmentIntoPageCacheExec
);
}
catch (Exception e) {
removeSegment(segment, callback, false);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getId());
}
if (loaded) {
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment, callback, false);
throw new SegmentLoadingException(
e,
"Failed to write to disk segment info cache file[%s]",
segmentInfoCacheFile
);
}
}
}
}
public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
{
return segmentManager.getAverageRowCountForDatasource();
}
public Map<String, SegmentRowCountDistribution> getRowCountDistributionPerDatasource()
{
return segmentManager.getRowCountDistribution();
}
@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
SegmentChangeStatus result = null;
try {
log.info("Loading segment %s", segment.getId());
/*
The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts,
and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment
files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads
the segment, which makes dropping segment and downloading segment happen at the same time.
*/
if (segmentsToDelete.contains(segment)) {
/*
Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case,
each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make
things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of
cost of acquiring lock by doing the "contains" check outside the synchronized block.
*/
synchronized (segmentDeleteLock) {
segmentsToDelete.remove(segment);
}
}
loadSegment(segment, DataSegmentChangeCallback.NOOP, false);
// announce segment even if the segment file already exists.
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId());
}
result = SegmentChangeStatus.SUCCESS;
}
catch (Throwable e) {
log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment)
.emit();
result = SegmentChangeStatus.failed(e.toString());
}
finally {
updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
if (null != callback) {
callback.execute();
}
}
}
/**
* Bulk adding segments during bootstrap
* @param segments A collection of segments to add
* @param callback Segment loading callback
*/
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{
// Start a temporary thread pool to load segments into page cache during bootstrap
ExecutorService loadingExecutor = null;
ExecutorService loadSegmentsIntoPageCacheOnBootstrapExec =
config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() != 0 ?
Execs.multiThreaded(config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap(),
"Load-Segments-Into-Page-Cache-On-Bootstrap-%s") : null;
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "Segment-Load-Startup-%s");
final int numSegments = segments.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
loadingExecutor.submit(
() -> {
try {
log.info(
"Loading segment[%d/%d][%s]",
counter.incrementAndGet(),
numSegments,
segment.getId()
);
loadSegment(segment, callback, config.isLazyLoadOnStart(), loadSegmentsIntoPageCacheOnBootstrapExec);
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getId());
failedSegments.add(segment);
}
finally {
latch.countDown();
}
}
);
}
try {
latch.await();
if (failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
.addData("failedSegments", failedSegments)
.emit();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted").emit();
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size())
.emit();
}
finally {
callback.execute();
if (loadingExecutor != null) {
loadingExecutor.shutdownNow();
}
if (loadSegmentsIntoPageCacheOnBootstrapExec != null) {
// At this stage, all tasks have been submitted, send a shutdown command to the bootstrap
// thread pool so threads will exit after finishing the tasks
loadSegmentsIntoPageCacheOnBootstrapExec.shutdown();
}
}
}
@Override
public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
removeSegment(segment, callback, true);
}
@VisibleForTesting
void removeSegment(
final DataSegment segment,
@Nullable final DataSegmentChangeCallback callback,
final boolean scheduleDrop
)
{
SegmentChangeStatus result = null;
try {
announcer.unannounceSegment(segment);
segmentsToDelete.add(segment);
Runnable runnable = () -> {
try {
synchronized (segmentDeleteLock) {
if (segmentsToDelete.remove(segment)) {
segmentManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
.addData("segment", segment)
.emit();
}
};
if (scheduleDrop) {
log.info(
"Completely removing [%s] in [%,d] millis",
segment.getId(),
config.getDropSegmentDelayMillis()
);
exec.schedule(
runnable,
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
);
} else {
runnable.run();
}
result = SegmentChangeStatus.SUCCESS;
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment)
.emit();
result = SegmentChangeStatus.failed(e.getMessage());
}
finally {
updateRequestStatus(new SegmentChangeRequestDrop(segment), result);
if (null != callback) {
callback.execute();
}
}
}
public Collection<DataSegment> getSegmentsToDelete()
{
return ImmutableList.copyOf(segmentsToDelete);
}
public ListenableFuture<List<DataSegmentChangeResponse>> processBatch(List<DataSegmentChangeRequest> changeRequests)
{
boolean isAnyRequestDone = false;
Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
AtomicReference<SegmentChangeStatus> status = processRequest(cr);
if (status.get().getState() != SegmentChangeStatus.State.PENDING) {
isAnyRequestDone = true;
}
statuses.put(cr, status);
}
CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses);
if (isAnyRequestDone) {
future.resolve();
} else {
synchronized (waitingFutures) {
waitingFutures.add(future);
}
}
return future;
}
private AtomicReference<SegmentChangeStatus> processRequest(DataSegmentChangeRequest changeRequest)
{
synchronized (requestStatusesLock) {
AtomicReference<SegmentChangeStatus> status = requestStatuses.getIfPresent(changeRequest);
// If last load/drop request status is failed, here can try that again
if (status == null || status.get().getState() == SegmentChangeStatus.State.FAILED) {
changeRequest.go(
new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING));
exec.submit(
() -> SegmentLoadDropHandler.this.addSegment(
((SegmentChangeRequestLoad) changeRequest).getSegment(),
() -> resolveWaitingFutures()
)
);
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING));
SegmentLoadDropHandler.this.removeSegment(
((SegmentChangeRequestDrop) changeRequest).getSegment(),
() -> resolveWaitingFutures(),
true
);
}
},
this::resolveWaitingFutures
);
} else if (status.get().getState() == SegmentChangeStatus.State.SUCCESS) {
// SUCCESS case, we'll clear up the cached success while serving it to this client
// Not doing this can lead to an incorrect response to upcoming clients for a reload
requestStatuses.invalidate(changeRequest);
return status;
}
return requestStatuses.getIfPresent(changeRequest);
}
}
private void updateRequestStatus(DataSegmentChangeRequest changeRequest, SegmentChangeStatus result)
{
if (result == null) {
result = SegmentChangeStatus.failed("Unknown reason. Check server logs.");
}
synchronized (requestStatusesLock) {
AtomicReference<SegmentChangeStatus> statusRef = requestStatuses.getIfPresent(changeRequest);
if (statusRef != null) {
statusRef.set(result);
}
}
}
private void resolveWaitingFutures()
{
LinkedHashSet<CustomSettableFuture> waitingFuturesCopy;
synchronized (waitingFutures) {
waitingFuturesCopy = new LinkedHashSet<>(waitingFutures);
waitingFutures.clear();
}
for (CustomSettableFuture future : waitingFuturesCopy) {
future.resolve();
}
}
/**
* Returns whether or not we should announce ourselves as a data server using {@link DataSegmentServerAnnouncer}.
*
* Returns true if _either_:
*
* (1) Our {@link #serverTypeConfig} indicates we are a segment server. This is necessary for Brokers to be able
* to detect that we exist.
* (2) We have non-empty storage locations in {@link #config}. This is necessary for Coordinators to be able to
* assign segments to us.
*/
private boolean shouldAnnounce()
{
return serverTypeConfig.getServerType().isSegmentServer() || !config.getLocations().isEmpty();
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int intervalMillis;
private final DataSegmentAnnouncer announcer;
private final ScheduledExecutorService exec;
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private final Object lock = new Object();
private volatile boolean finished = false;
@Nullable
private volatile ScheduledFuture startedAnnouncing = null;
@Nullable
private volatile ScheduledFuture nextAnnoucement = null;
public BackgroundSegmentAnnouncer(
DataSegmentAnnouncer announcer,
ScheduledExecutorService exec,
int intervalMillis
)
{
this.announcer = announcer;
this.exec = exec;
this.intervalMillis = intervalMillis;
this.queue = new LinkedBlockingQueue<>();
this.doneAnnouncing = SettableFuture.create();
}
public void announceSegment(final DataSegment segment) throws InterruptedException
{
if (finished) {
throw new ISE("Announce segment called after finishAnnouncing");
}
queue.put(segment);
}
public void startAnnouncing()
{
if (intervalMillis <= 0) {
return;
}
log.info("Starting background segment announcing task");
// schedule background announcing task
nextAnnoucement = startedAnnouncing = exec.schedule(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
}
},
intervalMillis,
TimeUnit.MILLISECONDS
);
}
public void finishAnnouncing() throws SegmentLoadingException
{
synchronized (lock) {
finished = true;
// announce any remaining segments
try {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
announcer.announceSegments(segments);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
// get any exception that may have been thrown in background announcing
try {
// check in case intervalMillis is <= 0
if (startedAnnouncing != null) {
startedAnnouncing.cancel(false);
}
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
if (doneAnnouncing.isDone()) {
doneAnnouncing.get();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
}
log.info("Completed background segment announcing");
}
@Override
public void close()
{
// stop background scheduling
synchronized (lock) {
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
}
}
}
}
// Future with cancel() implementation to remove it from "waitingFutures" list
private class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeResponse>>
{
private final LinkedHashSet<CustomSettableFuture> waitingFutures;
private final Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> statusRefs;
private CustomSettableFuture(
LinkedHashSet<CustomSettableFuture> waitingFutures,
Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> statusRefs
)
{
this.waitingFutures = waitingFutures;
this.statusRefs = statusRefs;
}
public void resolve()
{
synchronized (requestStatusesLock) {
if (isDone()) {
return;
}
final List<DataSegmentChangeResponse> result = new ArrayList<>(statusRefs.size());
statusRefs.forEach((request, statusRef) -> {
// Remove complete statuses from the cache
final SegmentChangeStatus status = statusRef.get();
if (status != null && status.getState() != SegmentChangeStatus.State.PENDING) {
requestStatuses.invalidate(request);
}
result.add(new DataSegmentChangeResponse(request, status));
});
set(result);
}
}
@Override
public boolean cancel(boolean interruptIfRunning)
{
synchronized (waitingFutures) {
waitingFutures.remove(this);
}
return true;
}
}
}