blob: bf2bcb4e560e9bafc559579d3341675a60b12d52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
{
private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
private final ObjectMapper jsonMapper;
private final MetadataStorageTablesConfig dbTables;
private final SQLMetadataConnector connector;
@Inject
public IndexerSQLMetadataStorageCoordinator(
ObjectMapper jsonMapper,
MetadataStorageTablesConfig dbTables,
SQLMetadataConnector connector
)
{
this.jsonMapper = jsonMapper;
this.dbTables = dbTables;
this.connector = connector;
}
enum DataSourceMetadataUpdateResult
{
SUCCESS,
FAILURE,
TRY_AGAIN
}
@LifecycleStart
public void start()
{
connector.createDataSourceTable();
connector.createPendingSegmentsTable();
connector.createSegmentTable();
}
@Override
public Collection<DataSegment> retrieveUsedSegmentsForIntervals(
final String dataSource,
final List<Interval> intervals,
final Segments visibility
)
{
if (intervals == null || intervals.isEmpty()) {
throw new IAE("null/empty intervals");
}
return doRetrieveUsedSegments(dataSource, intervals, visibility);
}
@Override
public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments visibility)
{
return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility);
}
/**
* @param intervals empty list means unrestricted interval.
*/
private Collection<DataSegment> doRetrieveUsedSegments(
final String dataSource,
final List<Interval> intervals,
final Segments visibility
)
{
return connector.retryWithHandle(
handle -> {
if (visibility == Segments.ONLY_VISIBLE) {
final VersionedIntervalTimeline<String, DataSegment> timeline =
getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
} else {
return retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals);
}
}
);
}
@Override
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource)
{
String rawQueryString = "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true";
final String queryString = StringUtils.format(rawQueryString, dbTables.getSegmentsTable());
return connector.retryWithHandle(
handle -> {
Query<Map<String, Object>> query = handle
.createQuery(queryString)
.bind("dataSource", dataSource);
return query
.map((int index, ResultSet r, StatementContext ctx) ->
new Pair<>(
JacksonUtils.readValue(jsonMapper, r.getBytes("payload"), DataSegment.class),
r.getString("created_date")
)
)
.list();
}
);
}
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
// 2 range conditions are used on different columns, but not all SQL databases properly optimize it.
// Some databases can only use an index on one of the columns. An additional condition provides
// explicit knowledge that 'start' cannot be greater than 'end'.
return handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start "
+ "and start <= :end and %2$send%2$s <= :end and used = false",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.fold(
new ArrayList<>(),
(Folder3<List<DataSegment>, byte[]>) (accumulator, payload, foldController, statementContext) -> {
accumulator.add(JacksonUtils.readValue(jsonMapper, payload, DataSegment.class));
return accumulator;
}
);
}
);
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
return matchingSegments;
}
private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
{
final List<SegmentIdWithShardSpec> identifiers = new ArrayList<>();
final ResultIterator<byte[]> dbSegments =
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start <= :end and %2$send%2$s >= :start",
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.iterator();
while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();
final SegmentIdWithShardSpec identifier = jsonMapper.readValue(payload, SegmentIdWithShardSpec.class);
if (interval.overlaps(identifier.getInterval())) {
identifiers.add(identifier);
}
}
dbSegments.close();
return identifiers;
}
private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
)
{
Query<Map<String, Object>> sql = createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals);
try (final ResultIterator<byte[]> dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) {
return VersionedIntervalTimeline.forSegments(
Iterators.transform(dbSegments, payload -> JacksonUtils.readValue(jsonMapper, payload, DataSegment.class))
);
}
}
private Collection<DataSegment> retrieveAllUsedSegmentsForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
)
{
return createUsedSegmentsSqlQueryForIntervals(handle, dataSource, intervals)
.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes("payload"), DataSegment.class))
.list();
}
/**
* Creates a query to the metadata store which selects payload from the segments table for all segments which are
* marked as used and whose interval intersects (not just abuts) with any of the intervals given to this method.
*/
private Query<Map<String, Object>> createUsedSegmentsSqlQueryForIntervals(
Handle handle,
String dataSource,
List<Interval> intervals
)
{
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ?");
if (!intervals.isEmpty()) {
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
StringUtils.format("(start < ? AND %1$send%1$s > ?)", connector.getQuoteString())
);
if (i == intervals.size() - 1) {
sb.append(")");
} else {
sb.append(" OR ");
}
}
}
Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.bind(0, dataSource);
for (int i = 0; i < intervals.size(); i++) {
Interval interval = intervals.get(i);
sql = sql
.bind(2 * i + 1, interval.getEnd().toString())
.bind(2 * i + 2, interval.getStart().toString());
}
return sql;
}
@Override
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException
{
final SegmentPublishResult result = announceHistoricalSegments(segments, null, null);
// Metadata transaction cannot fail because we are not trying to do one.
if (!result.isSuccess()) {
throw new ISE("announceHistoricalSegments failed with null metadata, should not happen.");
}
return result.getSegments();
}
@Override
public SegmentPublishResult announceHistoricalSegments(
final Set<DataSegment> segments,
@Nullable final DataSourceMetadata startMetadata,
@Nullable final DataSourceMetadata endMetadata
) throws IOException
{
if (segments.isEmpty()) {
throw new IllegalArgumentException("segment set must not be empty");
}
final String dataSource = segments.iterator().next().getDataSource();
for (DataSegment segment : segments) {
if (!dataSource.equals(segment.getDataSource())) {
throw new IllegalArgumentException("segments must all be from the same dataSource");
}
}
if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) {
throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
}
// Find which segments are used (i.e. not overshadowed).
final Set<DataSegment> usedSegments = new HashSet<>();
List<TimelineObjectHolder<String, DataSegment>> segmentHolders =
VersionedIntervalTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY);
for (TimelineObjectHolder<String, DataSegment> holder : segmentHolders) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
usedSegments.add(chunk.getObject());
}
}
final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
try {
return connector.retryTransaction(
new TransactionCallback<SegmentPublishResult>()
{
@Override
public SegmentPublishResult inTransaction(
final Handle handle,
final TransactionStatus transactionStatus
) throws Exception
{
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);
final Set<DataSegment> inserted = new HashSet<>();
if (startMetadata != null) {
final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);
if (result != DataSourceMetadataUpdateResult.SUCCESS) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result == DataSourceMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
} else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) {
throw new RetryTransactionException("Aborting transaction!");
}
}
}
for (final DataSegment segment : segments) {
if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) {
inserted.add(segment);
}
}
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
catch (CallbackFailedException e) {
if (definitelyNotUpdated.get()) {
return SegmentPublishResult.fail(e.getMessage());
} else {
// Must throw exception if we are not sure if we updated or not.
throw e;
}
}
}
@Override
public SegmentPublishResult commitMetadataOnly(
String dataSource,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
)
{
if (dataSource == null) {
throw new IllegalArgumentException("datasource name cannot be null");
}
if (startMetadata == null) {
throw new IllegalArgumentException("start metadata cannot be null");
}
if (endMetadata == null) {
throw new IllegalArgumentException("end metadata cannot be null");
}
final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
try {
return connector.retryTransaction(
new TransactionCallback<SegmentPublishResult>()
{
@Override
public SegmentPublishResult inTransaction(
final Handle handle,
final TransactionStatus transactionStatus
) throws Exception
{
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);
final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);
if (result != DataSourceMetadataUpdateResult.SUCCESS) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);
if (result == DataSourceMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
} else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) {
throw new RetryTransactionException("Aborting transaction!");
}
}
return SegmentPublishResult.ok(ImmutableSet.of());
}
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
catch (CallbackFailedException e) {
if (definitelyNotUpdated.get()) {
return SegmentPublishResult.fail(e.getMessage());
} else {
// Must throw exception if we are not sure if we updated or not.
throw e;
}
}
}
@Override
public SegmentIdWithShardSpec allocatePendingSegment(
final String dataSource,
final String sequenceName,
@Nullable final String previousSegmentId,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion,
final boolean skipSegmentLineageCheck
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "version");
Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC());
return connector.retryWithHandle(
handle -> {
if (skipSegmentLineageCheck) {
return allocatePendingSegment(
handle,
dataSource,
sequenceName,
allocateInterval,
partialShardSpec,
maxVersion
);
} else {
return allocatePendingSegmentWithSegmentLineageCheck(
handle,
dataSource,
sequenceName,
previousSegmentId,
allocateInterval,
partialShardSpec,
maxVersion
);
}
}
);
}
@Nullable
private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(
final Handle handle,
final String dataSource,
final String sequenceName,
@Nullable final String previousSegmentId,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
),
interval,
sequenceName,
previousSegmentIdNotNull,
Pair.of("dataSource", dataSource),
Pair.of("sequence_name", sequenceName),
Pair.of("sequence_prev_id", previousSegmentIdNotNull)
);
if (result.found) {
// The found existing segment identifier can be null if its interval doesn't match with the given interval
return result.segmentIdentifier;
}
final SegmentIdWithShardSpec newIdentifier = createNewSegment(
handle,
dataSource,
interval,
partialShardSpec,
maxVersion
);
if (newIdentifier == null) {
return null;
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
// UNIQUE key for the row, ensuring sequences do not fork in two directions.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319)
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
.hash()
.asBytes()
);
insertToMetastore(
handle,
newIdentifier,
dataSource,
interval,
previousSegmentIdNotNull,
sequenceName,
sequenceNamePrevIdSha1
);
return newIdentifier;
}
@Nullable
private SegmentIdWithShardSpec allocatePendingSegment(
final Handle handle,
final String dataSource,
final String sequenceName,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion
) throws IOException
{
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
),
interval,
sequenceName,
null,
Pair.of("dataSource", dataSource),
Pair.of("sequence_name", sequenceName),
Pair.of("start", interval.getStart().toString()),
Pair.of("end", interval.getEnd().toString())
);
if (result.found) {
// The found existing segment identifier can be null if its interval doesn't match with the given interval
return result.segmentIdentifier;
}
final SegmentIdWithShardSpec newIdentifier = createNewSegment(
handle,
dataSource,
interval,
partialShardSpec,
maxVersion
);
if (newIdentifier == null) {
return null;
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
// UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319)
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis())
.hash()
.asBytes()
);
// always insert empty previous sequence id
insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
log.info("Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier, sequenceName);
return newIdentifier;
}
private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(
final Query<Map<String, Object>> query,
final Interval interval,
final String sequenceName,
final @Nullable String previousSegmentId,
final Pair<String, String>... queryVars
) throws IOException
{
Query<Map<String, Object>> boundQuery = query;
for (Pair<String, String> var : queryVars) {
boundQuery = boundQuery.bind(var.lhs, var.rhs);
}
final List<byte[]> existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list();
if (!existingBytes.isEmpty()) {
final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdWithShardSpec.class
);
if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
if (previousSegmentId == null) {
log.info("Found existing pending segment [%s] for sequence[%s] in DB", existingIdentifier, sequenceName);
} else {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier,
sequenceName,
previousSegmentId
);
}
return new CheckExistingSegmentIdResult(true, existingIdentifier);
} else {
if (previousSegmentId == null) {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] in DB, "
+ "does not match requested interval[%s]",
existingIdentifier,
sequenceName,
interval
);
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier,
sequenceName,
previousSegmentId,
interval
);
}
return new CheckExistingSegmentIdResult(true, null);
}
}
return new CheckExistingSegmentIdResult(false, null);
}
private static class CheckExistingSegmentIdResult
{
private final boolean found;
@Nullable
private final SegmentIdWithShardSpec segmentIdentifier;
CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdWithShardSpec segmentIdentifier)
{
this.found = found;
this.segmentIdentifier = segmentIdentifier;
}
}
private void insertToMetastore(
Handle handle,
SegmentIdWithShardSpec newIdentifier,
String dataSource,
Interval interval,
String previousSegmentId,
String sequenceName,
String sequenceNamePrevIdSha1
) throws JsonProcessingException
{
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ "sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ ":sequence_name_prev_id_sha1, :payload)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", newIdentifier.toString())
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentId)
.bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
.execute();
}
@Nullable
private SegmentIdWithShardSpec createNewSegment(
final Handle handle,
final String dataSource,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion
) throws IOException
{
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
).lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s]: already have [%,d] chunks.",
dataSource,
interval,
existingChunks.size()
);
return null;
} else {
// max partitionId of the shardSpecs which share the same partition space.
SegmentIdWithShardSpec maxId = null;
if (!existingChunks.isEmpty()) {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
//noinspection ConstantConditions
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
// Here we check only the segments of the shardSpec which shares the same partition space with the given
// partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others.
// See PartitionIds.
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
// Don't use the stream API for performance.
if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
}
final List<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(
handle,
dataSource,
interval
);
if (maxId != null) {
pendings.add(maxId);
}
maxId = pendings.stream()
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
.max((id1, id2) -> {
final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
if (versionCompare != 0) {
return versionCompare;
} else {
return Integer.compare(id1.getShardSpec().getPartitionNum(), id2.getShardSpec().getPartitionNum());
}
})
.orElse(null);
// Find the major version of existing segments
@Nullable final String versionOfExistingChunks;
if (!existingChunks.isEmpty()) {
versionOfExistingChunks = existingChunks.get(0).getVersion();
} else if (!pendings.isEmpty()) {
versionOfExistingChunks = pendings.get(0).getVersion();
} else {
versionOfExistingChunks = null;
}
if (maxId == null) {
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
// or you use segment lock. When appending segments, null maxId means that we are allocating the very initial
// segment for this time chunk. Since the core partitions set is not determined for appended segments, we set
// it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace()
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = versionOfExistingChunks == null ? maxVersion : versionOfExistingChunks;
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
} else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(maxVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
maxVersion,
maxId
);
return null;
} else if (maxId.getShardSpec().getNumCorePartitions() == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
maxId,
maxId.getShardSpec()
);
return null;
} else {
return new SegmentIdWithShardSpec(
dataSource,
maxId.getInterval(),
Preconditions.checkNotNull(versionOfExistingChunks, "versionOfExistingChunks"),
partialShardSpec.complete(
jsonMapper,
maxId.getShardSpec().getPartitionNum() + 1,
maxId.getShardSpec().getNumCorePartitions()
)
);
}
}
}
@Override
public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE datasource = :dataSource AND created_date >= :start AND created_date < :end",
dbTables.getPendingSegmentsTable()
)
)
.bind("dataSource", dataSource)
.bind("start", deleteInterval.getStart().toString())
.bind("end", deleteInterval.getEnd().toString())
.execute()
);
}
@Override
public int deletePendingSegments(String dataSource)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format("DELETE FROM %s WHERE datasource = :dataSource", dbTables.getPendingSegmentsTable())
)
.bind("dataSource", dataSource)
.execute()
);
}
/**
* Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions.
*
* @return true if the segment was added, false if it already existed
*/
private boolean announceHistoricalSegment(
final Handle handle,
final DataSegment segment,
final boolean used
) throws IOException
{
try {
if (segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getId());
return false;
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
final int numRowsInserted = handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, "
+ "payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", used)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();
if (numRowsInserted == 1) {
log.info(
"Published segment [%s] to DB with used flag [%s], json[%s]",
segment.getId(),
used,
jsonMapper.writeValueAsString(segment)
);
} else if (numRowsInserted == 0) {
throw new ISE(
"Failed to publish segment[%s] to DB with used flag[%s], json[%s]",
segment.getId(),
used,
jsonMapper.writeValueAsString(segment)
);
} else {
throw new ISE(
"numRowsInserted[%s] is larger than 1 after inserting segment[%s] with used flag[%s], json[%s]",
numRowsInserted,
segment.getId(),
used,
jsonMapper.writeValueAsString(segment)
);
}
}
catch (Exception e) {
log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getId(), used);
throw e;
}
return true;
}
private boolean segmentExists(final Handle handle, final DataSegment segment)
{
return !handle
.createQuery(StringUtils.format("SELECT id FROM %s WHERE id = :identifier", dbTables.getSegmentsTable()))
.bind("identifier", segment.getId().toString())
.map(StringMapper.FIRST)
.list()
.isEmpty();
}
/**
* Read dataSource metadata. Returns null if there is no metadata.
*/
@Override
public @Nullable DataSourceMetadata retrieveDataSourceMetadata(final String dataSource)
{
final byte[] bytes = connector.lookup(
dbTables.getDataSourceTable(),
"dataSource",
"commit_metadata_payload",
dataSource
);
if (bytes == null) {
return null;
}
return JacksonUtils.readValue(jsonMapper, bytes, DataSourceMetadata.class);
}
/**
* Read dataSource metadata as bytes, from a specific handle. Returns null if there is no metadata.
*/
private @Nullable byte[] retrieveDataSourceMetadataWithHandleAsBytes(
final Handle handle,
final String dataSource
)
{
return connector.lookupWithHandle(
handle,
dbTables.getDataSourceTable(),
"dataSource",
"commit_metadata_payload",
dataSource
);
}
/**
* Compare-and-swap dataSource metadata in a transaction. This will only modify dataSource metadata if it equals
* oldCommitMetadata when this function is called (based on T.equals). This method is idempotent in that if
* the metadata already equals newCommitMetadata, it will return true.
*
* @param handle database handle
* @param dataSource druid dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
* {@link DataSourceMetadata#matches(DataSourceMetadata)}
* @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with
* {@link DataSourceMetadata#plus(DataSourceMetadata)}
*
* @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or
* TRY_AGAIN if it definitely was not updated. This guarantee is meant to help
* {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
* achieve its own guarantee.
*
* @throws RuntimeException if state is unknown after this call
*/
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
final Handle handle,
final String dataSource,
final DataSourceMetadata startMetadata,
final DataSourceMetadata endMetadata
) throws IOException
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(startMetadata, "startMetadata");
Preconditions.checkNotNull(endMetadata, "endMetadata");
final byte[] oldCommitMetadataBytesFromDb = retrieveDataSourceMetadataWithHandleAsBytes(handle, dataSource);
final String oldCommitMetadataSha1FromDb;
final DataSourceMetadata oldCommitMetadataFromDb;
if (oldCommitMetadataBytesFromDb == null) {
oldCommitMetadataSha1FromDb = null;
oldCommitMetadataFromDb = null;
} else {
oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes()
);
oldCommitMetadataFromDb = jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class);
}
final boolean startMetadataMatchesExisting;
if (oldCommitMetadataFromDb == null) {
startMetadataMatchesExisting = startMetadata.isValidStart();
} else {
// Checking against the last committed metadata.
// Converting the last one into start metadata for checking since only the same type of metadata can be matched.
// Even though kafka/kinesis indexing services use different sequenceNumber types for representing
// start and end sequenceNumbers, the below conversion is fine because the new start sequenceNumbers are supposed
// to be same with end sequenceNumbers of the last commit.
startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
log.error(
"Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].",
oldCommitMetadataFromDb,
startMetadata
);
return DataSourceMetadataUpdateResult.FAILURE;
}
// Only endOffsets should be stored in metadata store
final DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null
? endMetadata
: oldCommitMetadataFromDb.plus(endMetadata);
final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(newCommitMetadata);
final String newCommitMetadataSha1 = BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
final DataSourceMetadataUpdateResult retVal;
if (oldCommitMetadataBytesFromDb == null) {
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
final int numRows = handle.createStatement(
StringUtils.format(
"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) "
+ "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("commit_metadata_payload", newCommitMetadataBytes)
.bind("commit_metadata_sha1", newCommitMetadataSha1)
.execute();
retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
} else {
// Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE
final int numRows = handle.createStatement(
StringUtils.format(
"UPDATE %s SET "
+ "commit_metadata_payload = :new_commit_metadata_payload, "
+ "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+ "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb)
.bind("new_commit_metadata_payload", newCommitMetadataBytes)
.bind("new_commit_metadata_sha1", newCommitMetadataSha1)
.execute();
retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
}
if (retVal == DataSourceMetadataUpdateResult.SUCCESS) {
log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, newCommitMetadata);
} else {
log.info("Not updating metadata, compare-and-swap failure.");
}
return retVal;
}
@Override
public boolean deleteDataSourceMetadata(final String dataSource)
{
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle)
{
int rows = handle.createStatement(
StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable())
)
.bind("dataSource", dataSource)
.execute();
return rows > 0;
}
}
);
}
@Override
public boolean resetDataSourceMetadata(final String dataSource, final DataSourceMetadata dataSourceMetadata)
throws IOException
{
final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(dataSourceMetadata);
final String newCommitMetadataSha1 = BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle)
{
final int numRows = handle.createStatement(
StringUtils.format(
"UPDATE %s SET "
+ "commit_metadata_payload = :new_commit_metadata_payload, "
+ "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+ "WHERE dataSource = :dataSource",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("new_commit_metadata_payload", newCommitMetadataBytes)
.bind("new_commit_metadata_sha1", newCommitMetadataSha1)
.execute();
return numRows == 1;
}
}
);
}
@Override
public void updateSegmentMetadata(final Set<DataSegment> segments)
{
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
for (final DataSegment segment : segments) {
updatePayload(handle, segment);
}
return null;
}
}
);
}
@Override
public void deleteSegments(final Set<DataSegment> segments)
{
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus)
{
for (final DataSegment segment : segments) {
deleteSegment(handle, segment);
}
return null;
}
}
);
}
private void deleteSegment(final Handle handle, final DataSegment segment)
{
handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()))
.bind("id", segment.getId().toString())
.execute();
}
private void updatePayload(final Handle handle, final DataSegment segment) throws IOException
{
try {
handle
.createStatement(
StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable())
)
.bind("id", segment.getId().toString())
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
throw e;
}
}
@Override
public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata)
{
return 1 == connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES" +
" (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("commit_metadata_payload", jsonMapper.writeValueAsBytes(metadata))
.bind("commit_metadata_sha1", BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(jsonMapper.writeValueAsBytes(metadata)).asBytes()))
.execute()
);
}
}