blob: 7331cf6c6e52ab92421048f500be3310d128e034 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.Iterators;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatus;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* In native parallel indexing, this class store segment files of phase 1 tasks in local storage of middleManagers (or indexer)
* and phase 2 tasks read those files over HTTP.
*
* The directory where segment files are placed is structured as
* {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
*
* This class also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
* per supervisorTask and removes its all segment files if the supervisorTask is not running anymore.
*/
@ManageLifecycle
public class LocalIntermediaryDataManager implements IntermediaryDataManager
{
private static final Logger LOG = new Logger(LocalIntermediaryDataManager.class);
private final long intermediaryPartitionDiscoveryPeriodSec;
private final long intermediaryPartitionCleanupPeriodSec;
private final Period intermediaryPartitionTimeout;
private final TaskConfig taskConfig;
private final List<StorageLocation> shuffleDataLocations;
private final IndexingServiceClient indexingServiceClient;
// supervisorTaskId -> time to check supervisorTask status
// This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for
// the supervisor.
private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
// supervisorTaskId -> cyclic iterator of storage locations
private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<>();
// The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing,
// but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
// partitions.
// This can be null until LocalIntermediaryDataManager is started.
@MonotonicNonNull
private ScheduledExecutorService supervisorTaskChecker;
@Inject
public LocalIntermediaryDataManager(
WorkerConfig workerConfig,
TaskConfig taskConfig,
IndexingServiceClient indexingServiceClient
)
{
this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
this.taskConfig = taskConfig;
this.shuffleDataLocations = taskConfig
.getShuffleDataLocations()
.stream()
.map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
.collect(Collectors.toList());
this.indexingServiceClient = indexingServiceClient;
}
@LifecycleStart
public void start()
{
discoverSupervisorTaskPartitions();
supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
// Discover partitions for new supervisorTasks
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
try {
discoverSupervisorTaskPartitions();
}
catch (Exception e) {
LOG.warn(e, "Error while discovering supervisorTasks");
}
},
intermediaryPartitionDiscoveryPeriodSec,
intermediaryPartitionDiscoveryPeriodSec,
TimeUnit.SECONDS
);
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
try {
deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
}
catch (InterruptedException e) {
LOG.error(e, "Error while cleaning up partitions for expired supervisors");
}
catch (Exception e) {
LOG.warn(e, "Error while cleaning up partitions for expired supervisors");
}
},
intermediaryPartitionCleanupPeriodSec,
intermediaryPartitionCleanupPeriodSec,
TimeUnit.SECONDS
);
}
@LifecycleStop
public void stop() throws InterruptedException
{
if (supervisorTaskChecker != null) {
supervisorTaskChecker.shutdownNow();
supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
}
supervisorTaskCheckTimes.clear();
}
/**
* LocalIntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data.
*/
private void discoverSupervisorTaskPartitions()
{
for (StorageLocation location : shuffleDataLocations) {
final Path locationPath = location.getPath().toPath().toAbsolutePath();
final MutableInt numDiscovered = new MutableInt(0);
final File[] dirsPerSupervisorTask = location.getPath().listFiles();
if (dirsPerSupervisorTask != null) {
for (File supervisorTaskDir : dirsPerSupervisorTask) {
final String supervisorTaskId = supervisorTaskDir.getName();
supervisorTaskCheckTimes.computeIfAbsent(
supervisorTaskId,
k -> {
for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) {
final String relativeSegmentPath = locationPath
.relativize(eachFile.toPath().toAbsolutePath())
.toString();
// StorageLocation keeps track of how much storage capacity is being used.
// Newly found files should be known to the StorageLocation to keep it up to date.
final File reservedFile = location.reserve(
relativeSegmentPath,
eachFile.getName(),
eachFile.length()
);
if (reservedFile == null) {
LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath());
}
}
numDiscovered.increment();
return getExpiryTimeFromNow();
}
);
}
}
if (numDiscovered.getValue() > 0) {
LOG.info(
"Discovered partitions for [%s] new supervisor tasks under location[%s]",
numDiscovered.getValue(),
location.getPath()
);
}
}
}
/**
* Check supervisorTask status if its partitions have not been accessed in timeout and
* delete all partitions for the supervisorTask if it is already finished.
*
* Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
* the self-cleanup for when the cleanup request is missing.
*/
private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException
{
final DateTime now = DateTimes.nowUtc();
final Set<String> expiredSupervisorTasks = new HashSet<>();
for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
final String supervisorTaskId = entry.getKey();
final DateTime checkTime = entry.getValue();
if (checkTime.isAfter(now)) {
expiredSupervisorTasks.add(supervisorTaskId);
}
}
if (!expiredSupervisorTasks.isEmpty()) {
LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
}
if (!expiredSupervisorTasks.isEmpty()) {
final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
final String supervisorTaskId = entry.getKey();
final TaskStatus status = entry.getValue();
if (status.getStatusCode().isComplete()) {
// If it's finished, clean up all partitions for the supervisor task.
try {
deletePartitions(supervisorTaskId);
}
catch (IOException e) {
LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
}
} else {
// If it's still running, update last access time.
supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
}
}
}
}
/**
* Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
* supervisorTaskId.
*/
@Override
public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
throws IOException
{
// Get or create the location iterator for supervisorTask.
final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
supervisorTaskId,
k -> {
final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations);
// Random start of the iterator
final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size());
IntStream.range(0, random).forEach(i -> cyclicIterator.next());
return cyclicIterator;
}
);
// Create a zipped segment in a temp directory.
final File taskTempDir = taskConfig.getTaskTempDir(subTaskId);
final Closer closer = Closer.create();
closer.register(() -> {
try {
FileUtils.forceDelete(taskTempDir);
}
catch (IOException e) {
LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath());
}
});
if (!(segment.getShardSpec() instanceof BucketNumberedShardSpec)) {
throw new IAE(
"Invalid shardSpec type. Expected [%s] but got [%s]",
BucketNumberedShardSpec.class.getName(),
segment.getShardSpec().getClass().getName()
);
}
final BucketNumberedShardSpec<?> bucketNumberedShardSpec = (BucketNumberedShardSpec<?>) segment.getShardSpec();
//noinspection unused
try (final Closer resourceCloser = closer) {
FileUtils.forceMkdir(taskTempDir);
// Tempary compressed file. Will be removed when taskTempDir is deleted.
final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
if (unzippedSizeBytes == 0) {
throw new IOE(
"Read 0 bytes from segmentDir[%s]",
segmentDir.getAbsolutePath()
);
}
// Try copying the zipped segment to one of storage locations
for (int i = 0; i < shuffleDataLocations.size(); i++) {
final StorageLocation location = iterator.next();
final String partitionFilePath = getPartitionFilePath(
supervisorTaskId,
subTaskId,
segment.getInterval(),
bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID
);
final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length());
if (destFile != null) {
try {
FileUtils.forceMkdirParent(destFile);
org.apache.druid.java.util.common.FileUtils.writeAtomically(
destFile,
out -> Files.asByteSource(tempZippedFile).copyTo(out)
);
LOG.info(
"Wrote intermediary segment[%s] for subtask[%s] at [%s]",
segment.getId(),
subTaskId,
destFile
);
return unzippedSizeBytes;
}
catch (Exception e) {
location.release(partitionFilePath, tempZippedFile.length());
FileUtils.deleteQuietly(destFile);
LOG.warn(
e,
"Failed to write segment[%s] at [%s]. Trying again with the next location",
segment.getId(),
destFile
);
}
}
}
throw new ISE("Can't find location to handle segment[%s]", segment);
}
}
@Override
public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
{
IdUtils.validateId("supervisorTaskId", supervisorTaskId);
for (StorageLocation location : shuffleDataLocations) {
final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, bucketId));
if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
final File[] segmentFiles = partitionDir.listFiles();
if (segmentFiles == null) {
return Optional.empty();
} else {
for (File segmentFile : segmentFiles) {
if (segmentFile.getName().equals(subTaskId)) {
return Optional.of(Files.asByteSource(segmentFile));
}
}
return Optional.empty();
}
}
}
return Optional.empty();
}
private DateTime getExpiryTimeFromNow()
{
return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
}
@Override
public void deletePartitions(String supervisorTaskId) throws IOException
{
IdUtils.validateId("supervisorTaskId", supervisorTaskId);
for (StorageLocation location : shuffleDataLocations) {
final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
if (supervisorTaskPath.exists()) {
LOG.info("Cleaning up [%s]", supervisorTaskPath);
for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
location.removeFile(eachFile);
}
FileUtils.forceDelete(supervisorTaskPath);
}
}
supervisorTaskCheckTimes.remove(supervisorTaskId);
}
private static String getPartitionFilePath(
String supervisorTaskId,
String subTaskId,
Interval interval,
int bucketId
)
{
return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString();
}
private static String getPartitionDir(
String supervisorTaskId,
Interval interval,
int bucketId
)
{
return Paths.get(
supervisorTaskId,
interval.getStart().toString(),
interval.getEnd().toString(),
String.valueOf(bucketId)
).toString();
}
}