blob: 24482d19c96c5299cdf695c4a77c89e9dc677350 [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.appenderator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A BaseAppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you
* index unbounded streams. All handoff is done at the end of indexing.
* <p/>
* This class helps with doing things that Appenderators don't, including deciding which segments to use (with a
* SegmentAllocator), publishing segments to the metadata store (with a SegmentPublisher).
* <p/>
* This class has two child classes, i.e., {@link BatchAppenderatorDriver} and {@link StreamAppenderatorDriver},
* which are for batch and streaming ingestion, respectively. This class provides some fundamental methods for making
* the child classes' life easier like {@link #pushInBackground}, {@link #dropInBackground}, or
* {@link #publishInBackground}. The child classes can use these methods to achieve their goal.
* <p/>
* Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as
* you pass in. It's wrapped in some extra metadata needed by the driver.
*/
public abstract class BaseAppenderatorDriver implements Closeable
{
/**
* Allocated segments for a sequence
*/
static class SegmentsForSequence
{
// Interval Start millis -> List of Segments for this interval
// there might be multiple segments for a start interval, for example one segment
// can be in APPENDING state and others might be in PUBLISHING state
private final NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates;
// most recently allocated segment
private String lastSegmentId;
SegmentsForSequence()
{
this.intervalToSegmentStates = new TreeMap<>();
}
SegmentsForSequence(
NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates,
String lastSegmentId
)
{
this.intervalToSegmentStates = intervalToSegmentStates;
this.lastSegmentId = lastSegmentId;
}
void add(SegmentIdentifier identifier)
{
intervalToSegmentStates.computeIfAbsent(identifier.getInterval().getStartMillis(), k -> new LinkedList<>())
.addFirst(SegmentWithState.newSegment(identifier));
lastSegmentId = identifier.getIdentifierAsString();
}
Entry<Long, LinkedList<SegmentWithState>> floor(long timestamp)
{
return intervalToSegmentStates.floorEntry(timestamp);
}
LinkedList<SegmentWithState> get(long timestamp)
{
return intervalToSegmentStates.get(timestamp);
}
Stream<SegmentWithState> segmentStateStream()
{
return intervalToSegmentStates.values().stream().flatMap(Collection::stream);
}
}
private static final Logger log = new Logger(BaseAppenderatorDriver.class);
private final SegmentAllocator segmentAllocator;
private final UsedSegmentChecker usedSegmentChecker;
protected final Appenderator appenderator;
// sequenceName -> segmentsForSequence
// This map should be locked with itself before accessing it.
// Note: BatchAppenderatorDriver currently doesn't need to lock this map because it doesn't do anything concurrently.
// However, it's desired to do some operations like indexing and pushing at the same time. Locking this map is also
// required in BatchAppenderatorDriver once this feature is supported.
protected final Map<String, SegmentsForSequence> segments = new TreeMap<>();
protected final ListeningExecutorService executor;
BaseAppenderatorDriver(
Appenderator appenderator,
SegmentAllocator segmentAllocator,
UsedSegmentChecker usedSegmentChecker
)
{
this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d"));
}
@VisibleForTesting
Map<String, SegmentsForSequence> getSegments()
{
return segments;
}
/**
* Perform any initial setup and return currently persisted commit metadata.
* <p>
* Note that this method returns the same metadata you've passed in with your Committers, even though this class
* stores extra metadata on disk.
*
* @return currently persisted commit metadata
*/
@Nullable
public abstract Object startJob();
/**
* Find a segment in the {@link SegmentState#APPENDING} state for the given timestamp and sequenceName.
*/
private SegmentIdentifier getAppendableSegment(final DateTime timestamp, final String sequenceName)
{
synchronized (segments) {
final SegmentsForSequence segmentsForSequence = segments.get(sequenceName);
if (segmentsForSequence == null) {
return null;
}
final Map.Entry<Long, LinkedList<SegmentWithState>> candidateEntry = segmentsForSequence.floor(
timestamp.getMillis()
);
if (candidateEntry != null
&& candidateEntry.getValue().getFirst().getSegmentIdentifier().getInterval().contains(timestamp)
&& candidateEntry.getValue().getFirst().getState() == SegmentState.APPENDING) {
return candidateEntry.getValue().getFirst().getSegmentIdentifier();
} else {
return null;
}
}
}
/**
* Return a segment usable for "timestamp". May return null if no segment can be allocated.
*
* @param row input row
* @param sequenceName sequenceName for potential segment allocation
* @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
*
* @return identifier, or null
*
* @throws IOException if an exception occurs while allocating a segment
*/
private SegmentIdentifier getSegment(
final InputRow row,
final String sequenceName,
final boolean skipSegmentLineageCheck
) throws IOException
{
synchronized (segments) {
final DateTime timestamp = row.getTimestamp();
final SegmentIdentifier existing = getAppendableSegment(timestamp, sequenceName);
if (existing != null) {
return existing;
} else {
// Allocate new segment.
final SegmentsForSequence segmentsForSequence = segments.get(sequenceName);
final SegmentIdentifier newSegment = segmentAllocator.allocate(
row,
sequenceName,
segmentsForSequence == null ? null : segmentsForSequence.lastSegmentId,
// send lastSegmentId irrespective of skipSegmentLineageCheck so that
// unique constraint for sequence_name_prev_id_sha1 does not fail for
// allocatePendingSegment in IndexerSQLMetadataStorageCoordinator
skipSegmentLineageCheck
);
if (newSegment != null) {
for (SegmentIdentifier identifier : appenderator.getSegments()) {
if (identifier.equals(newSegment)) {
throw new ISE(
"WTF?! Allocated segment[%s] which conflicts with existing segment[%s].",
newSegment,
identifier
);
}
}
log.info("New segment[%s] for row[%s] sequenceName[%s].", newSegment, row, sequenceName);
addSegment(sequenceName, newSegment);
} else {
// Well, we tried.
log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName);
}
return newSegment;
}
}
}
private void addSegment(String sequenceName, SegmentIdentifier identifier)
{
synchronized (segments) {
segments.computeIfAbsent(sequenceName, k -> new SegmentsForSequence())
.add(identifier);
}
}
/**
* Add a row. Must not be called concurrently from multiple threads.
*
* @param row the row to add
* @param sequenceName sequenceName for this row's segment
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
* if {@param allowIncrementalPersists} is set to false then this will not be used
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
* @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period
* threshold is hit
*
* @return {@link AppenderatorDriverAddResult}
*
* @throws IOException if there is an I/O error while allocating or writing to a segment
*/
protected AppenderatorDriverAddResult append(
final InputRow row,
final String sequenceName,
@Nullable final Supplier<Committer> committerSupplier,
final boolean skipSegmentLineageCheck,
final boolean allowIncrementalPersists
) throws IOException
{
Preconditions.checkNotNull(row, "row");
Preconditions.checkNotNull(sequenceName, "sequenceName");
final SegmentIdentifier identifier = getSegment(row, sequenceName, skipSegmentLineageCheck);
if (identifier != null) {
try {
final Appenderator.AppenderatorAddResult result = appenderator.add(
identifier,
row,
committerSupplier == null ? null : wrapCommitterSupplier(committerSupplier),
allowIncrementalPersists
);
return AppenderatorDriverAddResult.ok(
identifier,
result.getNumRowsInSegment(),
appenderator.getTotalRowCount(),
result.isPersistRequired()
);
}
catch (SegmentNotWritableException e) {
throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier);
}
} else {
return AppenderatorDriverAddResult.fail();
}
}
/**
* Returns a stream of {@link SegmentWithState} for the given sequenceNames.
*/
Stream<SegmentWithState> getSegmentWithStates(Collection<String> sequenceNames)
{
synchronized (segments) {
return sequenceNames
.stream()
.map(segments::get)
.filter(Objects::nonNull)
.flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream())
.flatMap(Collection::stream);
}
}
/**
* Push the given segments in background.
*
* @param wrappedCommitter should not be null if you want to persist intermediate states
* @param segmentIdentifiers identifiers of the segments to be pushed
*
* @return a future for pushing segments
*/
ListenableFuture<SegmentsAndMetadata> pushInBackground(
@Nullable final WrappedCommitter wrappedCommitter,
final Collection<SegmentIdentifier> segmentIdentifiers
)
{
log.info("Pushing segments in background: [%s]", Joiner.on(", ").join(segmentIdentifiers));
return Futures.transform(
appenderator.push(segmentIdentifiers, wrappedCommitter),
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> {
// Sanity check
final Set<SegmentIdentifier> pushedSegments = segmentsAndMetadata.getSegments().stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toSet());
if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) {
throw new ISE(
"WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].",
pushedSegments,
segmentIdentifiers
);
}
return segmentsAndMetadata;
},
executor
);
}
/**
* Drop segments in background. The segments should be pushed (in batch ingestion) or published (in streaming
* ingestion) before being dropped.
*
* @param segmentsAndMetadata result of pushing or publishing
*
* @return a future for dropping segments
*/
ListenableFuture<SegmentsAndMetadata> dropInBackground(SegmentsAndMetadata segmentsAndMetadata)
{
log.info("Dropping segments[%s]", segmentsAndMetadata.getSegments());
final ListenableFuture<?> dropFuture = Futures.allAsList(
segmentsAndMetadata
.getSegments()
.stream()
.map(segment -> appenderator.drop(SegmentIdentifier.fromDataSegment(segment)))
.collect(Collectors.toList())
);
return Futures.transform(
dropFuture,
(Function<Object, SegmentsAndMetadata>) x -> {
final Object metadata = segmentsAndMetadata.getCommitMetadata();
return new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
);
}
);
}
/**
* Publish segments in background. The segments should be dropped (in batch ingestion) or pushed (in streaming
* ingestion) before being published.
*
* @param segmentsAndMetadata result of dropping or pushing
* @param publisher transactional segment publisher
*
* @return a future for publishing segments
*/
ListenableFuture<SegmentsAndMetadata> publishInBackground(
SegmentsAndMetadata segmentsAndMetadata,
TransactionalSegmentPublisher publisher
)
{
return executor.submit(
() -> {
if (segmentsAndMetadata.getSegments().isEmpty()) {
log.info("Nothing to publish, skipping publish step.");
} else {
log.info(
"Publishing segments with commitMetadata[%s]: [%s]",
segmentsAndMetadata.getCommitMetadata(),
Joiner.on(", ").join(segmentsAndMetadata.getSegments())
);
try {
final Object metadata = segmentsAndMetadata.getCommitMetadata();
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
);
if (published) {
log.info("Published segments.");
} else {
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
.getSegments()
.stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toSet());
if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info("Our segments really do exist, awaiting handoff.");
} else {
throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
return segmentsAndMetadata;
}
);
}
/**
* Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator.
*/
public void clear() throws InterruptedException
{
synchronized (segments) {
segments.clear();
}
appenderator.clear();
}
/**
* Closes this driver. Does not close the underlying Appenderator; you should do that yourself.
*/
@Override
public void close()
{
executor.shutdownNow();
}
/**
* Wrapped committer for BaseAppenderatorDriver. Used in only {@link StreamAppenderatorDriver} because batch ingestion
* doesn't need committing intermediate states.
*/
static class WrappedCommitter implements Committer
{
private final Committer delegate;
private final AppenderatorDriverMetadata metadata;
WrappedCommitter(Committer delegate, AppenderatorDriverMetadata metadata)
{
this.delegate = delegate;
this.metadata = metadata;
}
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
delegate.run();
}
}
WrappedCommitter wrapCommitter(final Committer committer)
{
final AppenderatorDriverMetadata wrappedMetadata;
final Map<String, SegmentsForSequence> snapshot;
synchronized (segments) {
snapshot = ImmutableMap.copyOf(segments);
}
wrappedMetadata = new AppenderatorDriverMetadata(
ImmutableMap.copyOf(
Maps.transformValues(
snapshot,
(Function<SegmentsForSequence, List<SegmentWithState>>) input -> ImmutableList.copyOf(
input.intervalToSegmentStates.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList())
)
)
),
snapshot.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue().lastSegmentId
)
),
committer.getMetadata()
);
return new WrappedCommitter(committer, wrappedMetadata);
}
private Supplier<Committer> wrapCommitterSupplier(final Supplier<Committer> committerSupplier)
{
return () -> wrapCommitter(committerSupplier.get());
}
}