blob: a5b1a3fbdd657b3c576b13f4f7f5ccf2e5fe12dc [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* This class is specialized for streaming ingestion. In streaming ingestion, the segment lifecycle is like:
*
* <pre>
* APPENDING -> APPEND_FINISHED -> PUBLISHED
* </pre>
*
* <ul>
* <li>APPENDING: Segment is available for appending.</li>
* <li>APPEND_FINISHED: Segment cannot be updated (data cannot be added anymore) and is waiting for being published.</li>
* <li>PUBLISHED: Segment is pushed to deep storage, its metadata is published to metastore, and finally the segment is
* dropped from local storage</li>
* </ul>
*/
public class StreamAppenderatorDriver extends BaseAppenderatorDriver
{
private static final Logger log = new Logger(StreamAppenderatorDriver.class);
private final SegmentHandoffNotifier handoffNotifier;
private final FireDepartmentMetrics metrics;
private final ObjectMapper objectMapper;
/**
* Create a driver.
*
* @param appenderator appenderator
* @param segmentAllocator segment allocator
* @param handoffNotifierFactory handoff notifier factory
* @param usedSegmentChecker used segment checker
* @param objectMapper object mapper, used for serde of commit metadata
* @param metrics Firedepartment metrics
*/
public StreamAppenderatorDriver(
Appenderator appenderator,
SegmentAllocator segmentAllocator,
SegmentHandoffNotifierFactory handoffNotifierFactory,
UsedSegmentChecker usedSegmentChecker,
ObjectMapper objectMapper,
FireDepartmentMetrics metrics
)
{
super(appenderator, segmentAllocator, usedSegmentChecker);
this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory")
.createSegmentHandoffNotifier(appenderator.getDataSource());
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
}
@Override
@Nullable
public Object startJob()
{
handoffNotifier.start();
final AppenderatorDriverMetadata metadata = objectMapper.convertValue(
appenderator.startJob(),
AppenderatorDriverMetadata.class
);
log.info("Restored metadata[%s].", metadata);
if (metadata != null) {
synchronized (segments) {
final Map<String, String> lastSegmentIds = metadata.getLastSegmentIds();
Preconditions.checkState(
metadata.getSegments().keySet().equals(lastSegmentIds.keySet()),
"Sequences for segment states and last segment IDs are not same"
);
final Map<String, SegmentsForSequenceBuilder> builders = new TreeMap<>();
for (Entry<String, List<SegmentWithState>> entry : metadata.getSegments().entrySet()) {
final String sequenceName = entry.getKey();
final SegmentsForSequenceBuilder builder = new SegmentsForSequenceBuilder(lastSegmentIds.get(sequenceName));
builders.put(sequenceName, builder);
entry.getValue().forEach(builder::add);
}
builders.forEach((sequence, builder) -> segments.put(sequence, builder.build()));
}
return metadata.getCallerMetadata();
} else {
return null;
}
}
/**
* Add a row. Must not be called concurrently from multiple threads.
*
* @param row the row to add
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
* if {@param allowIncrementalPersists} is set to false then this will not be used
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
* @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period
* threshold is hit
*
* @return {@link AppenderatorDriverAddResult}
*
* @throws IOException if there is an I/O error while allocating or writing to a segment
*/
public AppenderatorDriverAddResult add(
final InputRow row,
final String sequenceName,
final Supplier<Committer> committerSupplier,
final boolean skipSegmentLineageCheck,
final boolean allowIncrementalPersists
) throws IOException
{
return append(row, sequenceName, committerSupplier, skipSegmentLineageCheck, allowIncrementalPersists);
}
/**
* Move a set of identifiers out from "active", making way for newer segments.
* This method is to support KafkaIndexTask's legacy mode and will be removed in the future.
* See KakfaIndexTask.runLegacy().
*/
public void moveSegmentOut(final String sequenceName, final List<SegmentIdentifier> identifiers)
{
synchronized (segments) {
final SegmentsForSequence activeSegmentsForSequence = segments.get(sequenceName);
if (activeSegmentsForSequence == null) {
throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName);
}
for (final SegmentIdentifier identifier : identifiers) {
log.info("Moving segment[%s] out of active list.", identifier);
final long key = identifier.getInterval().getStartMillis();
if (activeSegmentsForSequence.get(key) == null || activeSegmentsForSequence.get(key).stream().noneMatch(
segmentWithState -> {
if (segmentWithState.getSegmentIdentifier().equals(identifier)) {
segmentWithState.finishAppending();
return true;
} else {
return false;
}
}
)) {
throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier);
}
}
}
}
/**
* Persist all data indexed through this driver so far. Blocks until complete.
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
*
* @return commitMetadata persisted
*/
public Object persist(final Committer committer) throws InterruptedException
{
try {
log.info("Persisting data.");
final long start = System.currentTimeMillis();
final Object commitMetadata = appenderator.persistAll(wrapCommitter(committer)).get();
log.info("Persisted pending data in %,dms.", System.currentTimeMillis() - start);
return commitMetadata;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata.
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
*
* @return future containing commitMetadata persisted
*/
public ListenableFuture<Object> persistAsync(final Committer committer)
{
return appenderator.persistAll(wrapCommitter(committer));
}
/**
* Execute a task in background to publish all segments corresponding to the given sequence names. The task
* internally pushes the segments to the deep storage first, and then publishes the metadata to the metadata storage.
*
* @param publisher segment publisher
* @param committer committer
* @param sequenceNames a collection of sequence names to be published
*
* @return a {@link ListenableFuture} for the submitted task which removes published {@code sequenceNames} from
* {@code activeSegments} and {@code publishPendingSegments}
*/
public ListenableFuture<SegmentsAndMetadata> publish(
final TransactionalSegmentPublisher publisher,
final Committer committer,
final Collection<String> sequenceNames
)
{
final List<SegmentIdentifier> theSegments = getSegmentWithStates(sequenceNames)
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toList());
final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
pushInBackground(wrapCommitter(committer), theSegments),
(AsyncFunction<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> publishInBackground(
segmentsAndMetadata,
publisher
)
);
return Futures.transform(
publishFuture,
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> {
synchronized (segments) {
sequenceNames.forEach(segments::remove);
}
return segmentsAndMetadata;
}
);
}
/**
* Register the segments in the given {@link SegmentsAndMetadata} to be handed off and execute a background task which
* waits until the hand off completes.
*
* @param segmentsAndMetadata the result segments and metadata of
* {@link #publish(TransactionalSegmentPublisher, Committer, Collection)}
*
* @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task
* which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata
* of the caller of {@link AppenderatorDriverMetadata}
*/
public ListenableFuture<SegmentsAndMetadata> registerHandoff(SegmentsAndMetadata segmentsAndMetadata)
{
if (segmentsAndMetadata == null) {
return Futures.immediateFuture(null);
} else {
final List<SegmentIdentifier> waitingSegmentIdList = segmentsAndMetadata.getSegments().stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toList());
final Object metadata = Preconditions.checkNotNull(segmentsAndMetadata.getCommitMetadata(), "commitMetadata");
if (waitingSegmentIdList.isEmpty()) {
return Futures.immediateFuture(
new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
((AppenderatorDriverMetadata) metadata).getCallerMetadata()
)
);
}
log.info("Register handoff of segments: [%s]", waitingSegmentIdList);
final SettableFuture<SegmentsAndMetadata> resultFuture = SettableFuture.create();
final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size());
for (final SegmentIdentifier segmentIdentifier : waitingSegmentIdList) {
handoffNotifier.registerSegmentHandoffCallback(
new SegmentDescriptor(
segmentIdentifier.getInterval(),
segmentIdentifier.getVersion(),
segmentIdentifier.getShardSpec().getPartitionNum()
),
MoreExecutors.sameThreadExecutor(),
() -> {
log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier);
metrics.incrementHandOffCount();
final ListenableFuture<?> dropFuture = appenderator.drop(segmentIdentifier);
Futures.addCallback(
dropFuture,
new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
if (numRemainingHandoffSegments.decrementAndGet() == 0) {
log.info("Successfully handed off [%d] segments.", segmentsAndMetadata.getSegments().size());
resultFuture.set(
new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
((AppenderatorDriverMetadata) metadata).getCallerMetadata()
)
);
}
}
@Override
public void onFailure(Throwable e)
{
log.warn(e, "Failed to drop segment[%s]?!", segmentIdentifier);
numRemainingHandoffSegments.decrementAndGet();
resultFuture.setException(e);
}
}
);
}
);
}
return resultFuture;
}
}
public ListenableFuture<SegmentsAndMetadata> publishAndRegisterHandoff(
final TransactionalSegmentPublisher publisher,
final Committer committer,
final Collection<String> sequenceNames
)
{
return Futures.transform(
publish(publisher, committer, sequenceNames),
this::registerHandoff
);
}
@Override
public void close()
{
super.close();
handoffNotifier.close();
}
private static class SegmentsForSequenceBuilder
{
private final NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates;
private final String lastSegmentId;
SegmentsForSequenceBuilder(String lastSegmentId)
{
this.intervalToSegmentStates = new TreeMap<>();
this.lastSegmentId = lastSegmentId;
}
void add(SegmentWithState segmentWithState)
{
final SegmentIdentifier identifier = segmentWithState.getSegmentIdentifier();
final LinkedList<SegmentWithState> segmentsInInterval = intervalToSegmentStates.computeIfAbsent(
identifier.getInterval().getStartMillis(),
k -> new LinkedList<>()
);
// always keep APPENDING segments for an interval start millis in the front
if (segmentWithState.getState() == SegmentState.APPENDING) {
segmentsInInterval.addFirst(segmentWithState);
} else {
segmentsInInterval.addLast(segmentWithState);
}
}
SegmentsForSequence build()
{
return new SegmentsForSequence(intervalToSegmentStates, lastSegmentId);
}
}
}