blob: d0308516e04bdf0797347d62b3cc427a341366c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockRequestForNewSegment;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/**
* Allocates a pending segment for a given timestamp.
* If a visible chunk of used segments contains the interval with the query granularity containing the timestamp,
* the pending segment is allocated with its interval.
* Else, if the interval with the preferred segment granularity containing the timestamp has no overlap
* with the existing used segments, the preferred segment granularity is used.
* Else, find the coarsest segment granularity, containing the interval with the query granularity for the timestamp,
* that does not overlap with the existing used segments. This granularity is used for allocation if it exists.
* <p/>
* This action implicitly acquires some task locks when it allocates segments. You do not have to acquire them
* beforehand, although you *do* have to release them yourself. (Note that task locks are automatically released when
* the task is finished.)
* <p/>
* If this action cannot acquire an appropriate task lock, or if it cannot expand an existing segment set, it returns
* null.
* </p>
* Do NOT allocate WEEK granularity segments unless the preferred segment granularity is WEEK.
*/
public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
{
public static final String TYPE = "segmentAllocate";
private static final Logger log = new Logger(SegmentAllocateAction.class);
// Prevent spinning forever in situations where the segment list just won't stop changing.
private static final int MAX_ATTEMPTS = 90;
private final String dataSource;
private final DateTime timestamp;
private final Granularity queryGranularity;
private final Granularity preferredSegmentGranularity;
private final String sequenceName;
private final String previousSegmentId;
private final boolean skipSegmentLineageCheck;
private final PartialShardSpec partialShardSpec;
private final LockGranularity lockGranularity;
private final TaskLockType taskLockType;
@JsonCreator
public SegmentAllocateAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("timestamp") DateTime timestamp,
@JsonProperty("queryGranularity") Granularity queryGranularity,
@JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("previousSegmentId") String previousSegmentId,
@JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck,
// nullable for backward compatibility
@JsonProperty("shardSpecFactory") @Nullable PartialShardSpec partialShardSpec,
@JsonProperty("lockGranularity") @Nullable LockGranularity lockGranularity,
@JsonProperty("taskLockType") @Nullable TaskLockType taskLockType
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
this.queryGranularity = Preconditions.checkNotNull(queryGranularity, "queryGranularity");
this.preferredSegmentGranularity = Preconditions.checkNotNull(
preferredSegmentGranularity,
"preferredSegmentGranularity"
);
this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
this.previousSegmentId = previousSegmentId;
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
this.partialShardSpec = partialShardSpec == null ? NumberedPartialShardSpec.instance() : partialShardSpec;
this.lockGranularity = lockGranularity == null ? LockGranularity.TIME_CHUNK : lockGranularity;
this.taskLockType = taskLockType == null ? TaskLockType.EXCLUSIVE : taskLockType;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public DateTime getTimestamp()
{
return timestamp;
}
@JsonProperty
public Granularity getQueryGranularity()
{
return queryGranularity;
}
@JsonProperty
public Granularity getPreferredSegmentGranularity()
{
return preferredSegmentGranularity;
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public String getPreviousSegmentId()
{
return previousSegmentId;
}
@JsonProperty
public boolean isSkipSegmentLineageCheck()
{
return skipSegmentLineageCheck;
}
@JsonProperty("shardSpecFactory")
public PartialShardSpec getPartialShardSpec()
{
return partialShardSpec;
}
@JsonProperty
public LockGranularity getLockGranularity()
{
return lockGranularity;
}
@JsonProperty
public TaskLockType getTaskLockType()
{
return taskLockType;
}
@Override
public TypeReference<SegmentIdWithShardSpec> getReturnTypeReference()
{
return new TypeReference<SegmentIdWithShardSpec>()
{
};
}
@Override
public boolean canPerformAsync(Task task, TaskActionToolbox toolbox)
{
return toolbox.canBatchSegmentAllocation();
}
@Override
public Future<SegmentIdWithShardSpec> performAsync(Task task, TaskActionToolbox toolbox)
{
if (!toolbox.canBatchSegmentAllocation()) {
throw new ISE("Batched segment allocation is disabled");
}
return toolbox.getSegmentAllocationQueue().add(
new SegmentAllocateRequest(task, this, MAX_ATTEMPTS)
);
}
@Override
public SegmentIdWithShardSpec perform(
final Task task,
final TaskActionToolbox toolbox
)
{
if (!(task instanceof PendingSegmentAllocatingTask)) {
throw DruidException.defensive(
"Task[%s] of type[%s] cannot allocate segments as it does not implement PendingSegmentAllocatingTask.",
task.getId(), task.getType()
);
}
int attempt = 0;
while (true) {
attempt++;
if (!task.getDataSource().equals(dataSource)) {
throw new IAE("Task dataSource must match action dataSource, [%s] != [%s].", task.getDataSource(), dataSource);
}
final IndexerMetadataStorageCoordinator msc = toolbox.getIndexerMetadataStorageCoordinator();
// 1) if something overlaps our timestamp, use that
// 2) otherwise try preferredSegmentGranularity & going progressively smaller
final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC());
final Set<DataSegment> usedSegmentsForRow =
new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE));
final SegmentIdWithShardSpec identifier;
if (usedSegmentsForRow.isEmpty()) {
identifier = tryAllocateFirstSegment(toolbox, task, rowInterval);
} else {
identifier = tryAllocateSubsequentSegment(toolbox, task, rowInterval, usedSegmentsForRow.iterator().next());
}
if (identifier != null) {
return identifier;
}
// Could not allocate a pending segment. There's a chance that this is because someone else inserted a segment
// overlapping with this row between when we called "msc.retrieveUsedSegmentsForInterval" and now. Check it again,
// and if it's different, repeat.
Set<DataSegment> newUsedSegmentsForRow =
new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE));
if (!newUsedSegmentsForRow.equals(usedSegmentsForRow)) {
if (attempt < MAX_ATTEMPTS) {
final long shortRandomSleep = 50 + (long) (ThreadLocalRandom.current().nextDouble() * 450);
log.debug(
"Used segment set changed for rowInterval[%s]. Retrying segment allocation in %,dms (attempt = %,d).",
rowInterval,
shortRandomSleep,
attempt
);
try {
Thread.sleep(shortRandomSleep);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
} else {
log.error(
"Used segment set changed for rowInterval[%s]. Not trying again (attempt = %,d).",
rowInterval,
attempt
);
return null;
}
} else {
return null;
}
}
}
private SegmentIdWithShardSpec tryAllocateFirstSegment(TaskActionToolbox toolbox, Task task, Interval rowInterval)
{
// No existing segments for this row, but there might still be nearby ones that conflict with our preferred
// segment granularity. Try that first, and then progressively smaller ones if it fails.
final List<Interval> tryIntervals = Granularity.granularitiesFinerThan(preferredSegmentGranularity)
.stream()
.map(granularity -> granularity.bucket(timestamp))
.collect(Collectors.toList());
for (Interval tryInterval : tryIntervals) {
if (tryInterval.contains(rowInterval)) {
final SegmentIdWithShardSpec identifier = tryAllocate(toolbox, task, tryInterval, rowInterval, false);
if (identifier != null) {
return identifier;
}
}
}
return null;
}
private SegmentIdWithShardSpec tryAllocateSubsequentSegment(
TaskActionToolbox toolbox,
Task task,
Interval rowInterval,
DataSegment usedSegment
)
{
// Existing segment(s) exist for this row; use the interval of the first one.
if (!usedSegment.getInterval().contains(rowInterval)) {
log.error(
"The interval of existing segment[%s] doesn't contain rowInterval[%s]",
usedSegment.getId(),
rowInterval
);
return null;
} else {
// If segment allocation failed here, it is highly likely an unrecoverable error. We log here for easier
// debugging.
return tryAllocate(toolbox, task, usedSegment.getInterval(), rowInterval, true);
}
}
private SegmentIdWithShardSpec tryAllocate(
TaskActionToolbox toolbox,
Task task,
Interval tryInterval,
Interval rowInterval,
boolean logOnFail
)
{
// This action is always used by appending tasks, so if it is a time_chunk lock then we allow it to be
// shared with other appending tasks as well
final LockResult lockResult = toolbox.getTaskLockbox().tryLock(
task,
new LockRequestForNewSegment(
lockGranularity,
taskLockType,
task.getGroupId(),
dataSource,
tryInterval,
partialShardSpec,
task.getPriority(),
sequenceName,
previousSegmentId,
skipSegmentLineageCheck
)
);
if (lockResult.isRevoked()) {
// We had acquired a lock but it was preempted by other locks
throw new ISE("The lock for interval[%s] is preempted and no longer valid", tryInterval);
}
if (lockResult.isOk()) {
final SegmentIdWithShardSpec identifier = lockResult.getNewSegmentId();
if (identifier != null) {
return identifier;
} else {
final String msg = StringUtils.format(
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
if (logOnFail) {
log.error(msg);
} else {
log.debug(msg);
}
return null;
}
} else {
final String msg = StringUtils.format(
"Could not acquire lock for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
if (logOnFail) {
log.error(msg);
} else {
log.debug(msg);
}
return null;
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{
return "SegmentAllocateAction{" +
"dataSource='" + dataSource + '\'' +
", timestamp=" + timestamp +
", queryGranularity=" + queryGranularity +
", preferredSegmentGranularity=" + preferredSegmentGranularity +
", sequenceName='" + sequenceName + '\'' +
", previousSegmentId='" + previousSegmentId + '\'' +
", skipSegmentLineageCheck=" + skipSegmentLineageCheck +
", partialShardSpec=" + partialShardSpec +
", lockGranularity=" + lockGranularity +
'}';
}
}