blob: 0a1f00f90251636514d4fcf9c102b75a615a0562 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* The worker task of {@link SinglePhaseParallelIndexTaskRunner}. Similar to {@link IndexTask}, but this task
* generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of
* publishing on its own.
*/
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
private static final Logger LOG = new Logger(SinglePhaseSubTask.class);
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String subtaskSpecId;
/**
* If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
* In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
* If this task is overwriting existing segments, then we should know this task is changing segment granularity
* in advance to know what types of lock we should use. However, if intervals are missing, we can't know
* the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
* <p>
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs.
*/
private final boolean missingIntervalsInOverwriteMode;
@MonotonicNonNull
private AuthorizerMapper authorizerMapper;
@MonotonicNonNull
private RowIngestionMeters rowIngestionMeters;
@MonotonicNonNull
private ParseExceptionHandler parseExceptionHandler;
@Nullable
private String errorMsg;
private IngestionState ingestionState;
@JsonCreator
public SinglePhaseSubTask(
// id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@JsonProperty("id") @Nullable final String id,
@JsonProperty("groupId") final String groupId,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("supervisorTaskId") final String supervisorTaskId,
// subtaskSpecId can be null only for old task versions.
@JsonProperty("subtaskSpecId") @Nullable final String subtaskSpecId,
@JsonProperty("numAttempts") final int numAttempts, // zero-based counting
@JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context
)
{
super(
getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
groupId,
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context,
AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig()),
supervisorTaskId
);
if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
throw new UnsupportedOperationException("Guaranteed rollup is not supported");
}
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.missingIntervalsInOverwriteMode = ingestionSchema.getIOConfig().isAppendToExisting() != true
&& ingestionSchema.getDataSchema()
.getGranularitySpec()
.inputIntervals()
.isEmpty();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
this.ingestionState = IngestionState.NOT_STARTED;
}
@Override
public String getType()
{
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(i, ResourceType.EXTERNAL), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
return determineLockGranularityAndTryLock(
new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
}
@JsonProperty
public int getNumAttempts()
{
return numAttempts;
}
@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
return ingestionSchema;
}
@Override
@JsonProperty
public String getSubtaskSpecId()
{
return subtaskSpecId;
}
@Override
public String getTaskAllocatorId()
{
return getGroupId();
}
@Override
public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
{
try {
if (missingIntervalsInOverwriteMode) {
LOG.warn(
"Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+ "Forced to use timeChunk lock."
);
}
this.authorizerMapper = toolbox.getAuthorizerMapper();
toolbox.getChatHandlerProvider().register(getId(), this, false);
rowIngestionMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
parseExceptionHandler = new ParseExceptionHandler(
rowIngestionMeters,
ingestionSchema.getTuningConfig().isLogParseExceptions(),
ingestionSchema.getTuningConfig().getMaxParseExceptions(),
ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
);
final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build(
getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
ingestionState = IngestionState.BUILD_SEGMENTS;
final DataSegmentsWithSchemas dataSegmentsWithSchemas = generateAndPushSegments(
toolbox,
taskClient,
inputSource,
toolbox.getIndexingTmpDir()
);
// Find inputSegments overshadowed by pushedSegments
final Set<DataSegment> allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments());
allSegments.addAll(dataSegmentsWithSchemas.getSegments());
final SegmentTimeline timeline = SegmentTimeline.forSegments(allSegments);
final Set<DataSegment> oldSegments = FluentIterable.from(timeline.findFullyOvershadowed())
.transformAndConcat(TimelineObjectHolder::getObject)
.transform(PartitionChunk::getObject)
.toSet();
TaskReport.ReportMap taskReport = getTaskCompletionReports();
taskClient.report(new PushedSegmentsReport(getId(), oldSegments, dataSegmentsWithSchemas.getSegments(), taskReport, dataSegmentsWithSchemas.getSegmentSchemaMapping()));
toolbox.getTaskReportFileWriter().write(getId(), taskReport);
return TaskStatus.success(getId());
}
catch (Exception e) {
LOG.error(e, "Encountered exception in parallel sub task.");
errorMsg = Throwables.getStackTraceAsString(e);
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
finally {
toolbox.getChatHandlerProvider().unregister(getId());
}
}
@Override
public boolean requireLockExistingSegments()
{
return getIngestionMode() != IngestionMode.APPEND;
}
@Override
public List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals)
throws IOException
{
return findInputSegments(
getDataSource(),
taskActionClient,
intervals
);
}
@Override
public boolean isPerfectRollup()
{
return false;
}
@Nullable
@Override
public Granularity getSegmentGranularity()
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
if (granularitySpec instanceof ArbitraryGranularitySpec) {
return null;
} else {
return granularitySpec.getSegmentGranularity();
}
}
/**
* This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}.
* If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs
* if one of below conditions are satisfied.
*
* <ul>
* <li>
* If the number of rows in a segment exceeds {@link DynamicPartitionsSpec#maxRowsPerSegment}
* </li>
* <li>
* If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows}
* </li>
* </ul>
* <p>
* At the end of this method, all the remaining segments are published.
*
* @return true if generated segments are successfully published, otherwise false
*/
private DataSegmentsWithSchemas generateAndPushSegments(
final TaskToolbox toolbox,
final ParallelIndexSupervisorTaskClient taskClient,
final InputSource inputSource,
final File tmpDir
) throws IOException, InterruptedException
{
final DataSchema dataSchema = ingestionSchema.getDataSchema();
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
final FireDepartment fireDepartmentForMetrics =
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(
this,
fireDepartmentForMetrics,
rowIngestionMeters
);
toolbox.addMonitor(metricsMonitor);
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec();
final long pushTimeout = tuningConfig.getPushTimeout();
final boolean useLineageBasedSegmentAllocation = getContextValue(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
SinglePhaseParallelIndexTaskRunner.LEGACY_DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
);
// subtaskSpecId is used as the sequenceName, so that retry tasks for the same spec
// can allocate the same set of segments.
final String sequenceName = useLineageBasedSegmentAllocation
? Preconditions.checkNotNull(subtaskSpecId, "subtaskSpecId")
: getId();
final SegmentAllocatorForBatch segmentAllocator = SegmentAllocators.forLinearPartitioning(
toolbox,
sequenceName,
new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
getIngestionSchema().getDataSchema(),
getTaskLockHelper(),
getIngestionMode(),
partitionsSpec,
useLineageBasedSegmentAllocation
);
final boolean useMaxMemoryEstimates = getContextValue(
Tasks.USE_MAX_MEMORY_ESTIMATES,
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
fireDepartmentMetrics,
toolbox,
dataSchema,
tuningConfig,
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates
);
boolean exceptionOccurred = false;
try (
final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator);
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
tmpDir,
dataSchema,
inputSource,
inputSource.needsFormat() ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema) : null,
allowNonNullRowsWithinInputIntervalsOf(granularitySpec),
rowIngestionMeters,
parseExceptionHandler
)
) {
driver.startJob();
final Set<DataSegment> pushedSegments = new HashSet<>();
final SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
while (inputRowIterator.hasNext()) {
final InputRow inputRow = inputRowIterator.next();
// Segments are created as needed, using a single sequence name. They may be allocated from the overlord
// (in append mode) or may be created on our own authority (in overwrite mode).
final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
if (addResult.isOk()) {
final boolean isPushRequired = addResult.isPushRequired(
partitionsSpec.getMaxRowsPerSegment(),
partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS)
);
if (isPushRequired) {
// There can be some segments waiting for being published even though any rows won't be added to them.
// If those segments are not published here, the available space in appenderator will be kept to be small
// which makes the size of segments smaller.
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
segmentSchemaMapping.merge(pushed.getSegmentSchemaMapping());
LOG.info("Pushed [%s] segments and [%s] schemas", pushed.getSegments().size(), segmentSchemaMapping.getSchemaCount());
LOG.infoSegments(pushed.getSegments(), "Pushed segments");
LOG.info("SegmentSchema is [%s]", segmentSchemaMapping);
}
} else {
throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
}
fireDepartmentMetrics.incrementProcessed();
}
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
segmentSchemaMapping.merge(pushed.getSegmentSchemaMapping());
LOG.info("Pushed [%s] segments and [%s] schemas", pushed.getSegments().size(), segmentSchemaMapping.getSchemaCount());
LOG.infoSegments(pushed.getSegments(), "Pushed segments");
LOG.info("SegmentSchema is [%s]", segmentSchemaMapping);
appenderator.close();
return new DataSegmentsWithSchemas(pushedSegments, segmentSchemaMapping.isNonEmpty() ? segmentSchemaMapping : null);
}
catch (TimeoutException | ExecutionException e) {
exceptionOccurred = true;
throw new RuntimeException(e);
}
catch (Exception e) {
exceptionOccurred = true;
throw e;
}
finally {
if (exceptionOccurred) {
appenderator.closeNow();
} else {
appenderator.close();
}
toolbox.removeMonitor(metricsMonitor);
}
}
@GET
@Path("/unparseableEvents")
@Produces(MediaType.APPLICATION_JSON)
public Response getUnparseableEvents(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, List<ParseExceptionReport>> events = new HashMap<>();
if (addBuildSegmentStatsToReport(full != null, ingestionState)) {
events.put(
RowIngestionMeters.BUILD_SEGMENTS,
IndexTaskUtils.getReportListFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptionReports()
)
);
}
return Response.ok(events).build();
}
private Map<String, Object> doGetRowStats(boolean isFullReport)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
averagesMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getMovingAverages()
);
}
returnMap.put("totals", totalsMap);
returnMap.put("movingAverages", averagesMap);
return returnMap;
}
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
public Response getRowStats(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(doGetRowStats(full != null)).build();
}
TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
{
return buildLiveIngestionStatsReport(
ingestionState,
getTaskCompletionUnparseableEvents(),
doGetRowStats(isFullReport)
);
}
@GET
@Path("/liveReports")
@Produces(MediaType.APPLICATION_JSON)
public Response getLiveReports(
@Context final HttpServletRequest req,
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
return Response.ok(doGetLiveReports(full != null)).build();
}
@Override
protected Map<String, Object> getTaskCompletionRowStats()
{
return Collections.singletonMap(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
}
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
private TaskReport.ReportMap getTaskCompletionReports()
{
return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, null);
}
@Override
protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<ParseExceptionReport> parseExceptionMessages = IndexTaskUtils.getReportListFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptionReports()
);
if (parseExceptionMessages != null) {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, parseExceptionMessages);
} else {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, ImmutableList.of());
}
return unparseableEventsMap;
}
}