blob: fc990e107dd3aeb1a9d093d74a9d2493e80958c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.SQLStatement;
import org.skife.jdbi.v2.Update;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* An object that helps {@link SqlSegmentsMetadataManager} and {@link IndexerSQLMetadataStorageCoordinator} make
* queries to the metadata store segments table. Each instance of this class is scoped to a single handle and is meant
* to be short-lived.
*/
public class SqlSegmentsMetadataQuery
{
private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class);
/**
* Maximum number of intervals to consider for a batch.
* This is similar to {@link IndexerSQLMetadataStorageCoordinator#MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}, but imposed
* on the intervals size.
*/
private static final int MAX_INTERVALS_PER_BATCH = 100;
private final Handle handle;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig dbTables;
private final ObjectMapper jsonMapper;
private SqlSegmentsMetadataQuery(
final Handle handle,
final SQLMetadataConnector connector,
final MetadataStorageTablesConfig dbTables,
final ObjectMapper jsonMapper
)
{
this.handle = handle;
this.connector = connector;
this.dbTables = dbTables;
this.jsonMapper = jsonMapper;
}
/**
* Create a query object. This instance is scoped to a single handle and is meant to be short-lived. It is okay
* to use it for more than one query, though.
*/
public static SqlSegmentsMetadataQuery forHandle(
final Handle handle,
final SQLMetadataConnector connector,
final MetadataStorageTablesConfig dbTables,
final ObjectMapper jsonMapper
)
{
return new SqlSegmentsMetadataQuery(handle, connector, dbTables, jsonMapper);
}
/**
* Retrieves segments for a given datasource that are marked used (i.e. published) in the metadata store, and that
* *overlap* any interval in a particular collection of intervals. If the collection of intervals is empty, this
* method will retrieve all used segments.
*
* You cannot assume that segments returned by this call are actually active. Because there is some delay between
* new segment publishing and the marking-unused of older segments, it is possible that some segments returned
* by this call are overshadowed by other segments. To check for this, use
* {@link org.apache.druid.timeline.SegmentTimeline#forSegments(Iterable)}.
*
* This call does not return any information about realtime segments.
*
* @return a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals
)
{
return retrieveUsedSegments(dataSource, intervals, null);
}
/**
* Similar to {@link #retrieveUsedSegments}, but with an additional {@code versions} argument. When {@code versions}
* is specified, all used segments in the specified {@code intervals} and {@code versions} are retrieved.
*/
public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals,
final List<String> versions
)
{
return retrieveSegments(
dataSource,
intervals,
versions,
IntervalMode.OVERLAPS,
true,
null,
null,
null,
null
);
}
/**
* Retrieves segments for a given datasource that are marked unused and that are <b>fully contained by</b> any interval
* in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all
* unused segments.
* <p>
* This call does not return any information about realtime segments.
* </p>
*
* @param dataSource The name of the datasource
* @param intervals The intervals to search over
* @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}.
* If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. If an
* empty list is passed, no segments are retrieved.
* @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment
* lexigraphically if sortOrder is DESC.
* @param sortOrder Specifies the order with which to return the matching segments by start time, end time.
* A null value indicates that order does not matter.
* @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code intervals}
* with {@code used_status_last_updated} no later than this time will be included in the
* iterator. Segments without {@code used_status_last_updated} time (due to an upgrade
* from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored
*
* @return a closeable iterator. You should close it when you are done.
*
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
return retrieveSegments(
dataSource,
intervals,
versions,
IntervalMode.CONTAINS,
false,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime
);
}
/**
* Similar to {@link #retrieveUnusedSegments}, but also retrieves associated metadata for the segments for a given
* datasource that are marked unused and that are <b>fully contained by</b> any interval in a particular collection of
* intervals. If the collection of intervals is empty, this method will retrieve all unused segments.
*
* This call does not return any information about realtime segments.
*
* @param dataSource The name of the datasource
* @param intervals The intervals to search over
* @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment
* lexigraphically if sortOrder is DESC.
* @param sortOrder Specifies the order with which to return the matching segments by start time, end time.
* A null value indicates that order does not matter.
* @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code intervals}
* with {@code used_status_last_updated} no later than this time will be included in the
* iterator. Segments without {@code used_status_last_updated} time (due to an upgrade
* from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored
* @return a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegmentPlus> retrieveUnusedSegmentsPlus(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
return retrieveSegmentsPlus(
dataSource,
intervals,
versions,
IntervalMode.CONTAINS,
false,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime
);
}
public List<DataSegmentPlus> retrieveSegmentsById(
String datasource,
Set<String> segmentIds
)
{
final List<List<String>> partitionedSegmentIds
= Lists.partition(new ArrayList<>(segmentIds), 100);
final List<DataSegmentPlus> fetchedSegments = new ArrayList<>(segmentIds.size());
for (List<String> partition : partitionedSegmentIds) {
fetchedSegments.addAll(retrieveSegmentBatchById(datasource, partition, false));
}
return fetchedSegments;
}
public List<DataSegmentPlus> retrieveSegmentsWithSchemaById(
String datasource,
Set<String> segmentIds
)
{
final List<List<String>> partitionedSegmentIds
= Lists.partition(new ArrayList<>(segmentIds), 100);
final List<DataSegmentPlus> fetchedSegments = new ArrayList<>(segmentIds.size());
for (List<String> partition : partitionedSegmentIds) {
fetchedSegments.addAll(retrieveSegmentBatchById(datasource, partition, true));
}
return fetchedSegments;
}
private List<DataSegmentPlus> retrieveSegmentBatchById(
String datasource,
List<String> segmentIds,
boolean includeSchemaInfo
)
{
if (segmentIds.isEmpty()) {
return Collections.emptyList();
}
ResultIterator<DataSegmentPlus> resultIterator;
if (includeSchemaInfo) {
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
"SELECT payload, used, schema_fingerprint, num_rows FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds)
)
);
bindColumnValuesToQueryWithInCondition("id", segmentIds, query);
resultIterator = query
.bind("dataSource", datasource)
.setFetchSize(connector.getStreamingFetchSize())
.map(
(index, r, ctx) -> {
String schemaFingerprint = (String) r.getObject(3);
Long numRows = (Long) r.getObject(4);
return new DataSegmentPlus(
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
null,
null,
r.getBoolean(2),
schemaFingerprint,
numRows
);
}
)
.iterator();
} else {
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
"SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds)
)
);
bindColumnValuesToQueryWithInCondition("id", segmentIds, query);
resultIterator = query
.bind("dataSource", datasource)
.setFetchSize(connector.getStreamingFetchSize())
.map(
(index, r, ctx) -> new DataSegmentPlus(
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
null,
null,
r.getBoolean(2),
null,
null
)
)
.iterator();
}
return Lists.newArrayList(resultIterator);
}
/**
* Marks the provided segments as either used or unused.
*
* For better performance, please try to
* 1) ensure that the caller passes only used segments to this method when marking them as unused.
* 2) Similarly, please try to call this method only on unused segments when marking segments as used with this method.
*
* @return the number of segments actually modified.
*/
public int markSegments(final Collection<SegmentId> segmentIds, final boolean used)
{
final String dataSource;
if (segmentIds.isEmpty()) {
return 0;
} else {
dataSource = segmentIds.iterator().next().getDataSource();
if (segmentIds.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) {
throw new IAE("Segments to drop must all be part of the same datasource");
}
}
final PreparedBatch batch =
handle.prepareBatch(
StringUtils.format(
"UPDATE %s SET used = ?, used_status_last_updated = ? WHERE datasource = ? AND id = ?",
dbTables.getSegmentsTable()
)
);
for (SegmentId segmentId : segmentIds) {
batch.add(used, DateTimes.nowUtc().toString(), dataSource, segmentId.toString());
}
final int[] segmentChanges = batch.execute();
return computeNumChangedSegments(
segmentIds.stream().map(SegmentId::toString).collect(Collectors.toList()),
segmentChanges
);
}
/**
* Marks all used segments that are <b>fully contained by</b> a particular interval as unused.
*
* @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval)
{
return markSegmentsUnused(dataSource, interval, null);
}
/**
* Marks all used segments that are <b>fully contained by</b> a particular interval filtered by an optional list of versions
* as unused.
*
* @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List<String> versions)
{
if (versions != null && versions.isEmpty()) {
return 0;
}
if (Intervals.isEternity(interval)) {
final StringBuilder sb = new StringBuilder();
sb.append(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true",
dbTables.getSegmentsTable()
)
);
if (versions != null) {
sb.append(getParameterizedInConditionForColumn("version", versions));
}
final Update stmt = handle
.createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
} else if (Intervals.canCompareEndpointsAsStrings(interval)
&& interval.getStart().getYear() == interval.getEnd().getYear()) {
// Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because
// that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a
// segment interval like "20001/20002".)
final StringBuilder sb = new StringBuilder();
sb.append(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true AND %s",
dbTables.getSegmentsTable(),
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end")
)
);
if (versions != null) {
sb.append(getParameterizedInConditionForColumn("version", versions));
}
final Update stmt = handle
.createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
} else {
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
retrieveSegments(
dataSource,
Collections.singletonList(interval),
versions,
IntervalMode.CONTAINS,
true,
null,
null,
null,
null
),
DataSegment::getId
)
);
return markSegments(segments, false);
}
}
/**
* Retrieve the used segment for a given id if it exists in the metadata store and null otherwise
*/
public DataSegment retrieveUsedSegmentForId(String id)
{
final String query = "SELECT payload FROM %s WHERE used = true AND id = :id";
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(query, dbTables.getSegmentsTable()))
.bind("id", id);
final ResultIterator<DataSegment> resultIterator =
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
if (resultIterator.hasNext()) {
return resultIterator.next();
}
return null;
}
/**
* Retrieve the segment for a given id if it exists in the metadata store and null otherwise
*/
public DataSegment retrieveSegmentForId(String id)
{
final String query = "SELECT payload FROM %s WHERE id = :id";
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(query, dbTables.getSegmentsTable()))
.bind("id", id);
final ResultIterator<DataSegment> resultIterator =
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
if (resultIterator.hasNext()) {
return resultIterator.next();
}
return null;
}
/**
* Get the condition for the interval and match mode.
* @param intervals - intervals to fetch the segments for
* @param matchMode - Interval match mode - overlaps or contains
* @param quoteString - the connector-specific quote string
*/
public static String getConditionForIntervalsAndMatchMode(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final String quoteString
)
{
if (intervals.isEmpty()) {
return "";
}
final StringBuilder sb = new StringBuilder();
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
matchMode.makeSqlCondition(
quoteString,
StringUtils.format(":start%d", i),
StringUtils.format(":end%d", i)
)
);
// Add a special check for a segment which have one end at eternity and the other at some finite value. Since
// we are using string comparison, a segment with this start or end will not be returned otherwise.
if (matchMode.equals(IntervalMode.OVERLAPS)) {
sb.append(StringUtils.format(
" OR (start = '%s' AND \"end\" != '%s' AND \"end\" > :start%d)",
Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i
));
sb.append(StringUtils.format(
" OR (start != '%s' AND \"end\" = '%s' AND start < :end%d)",
Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i
));
}
if (i != intervals.size() - 1) {
sb.append(" OR ");
}
}
// Add a special check for a single segment with eternity. Since we are using string comparison, a segment with
// this start and end will not be returned otherwise.
// Known Issue: https://github.com/apache/druid/issues/12860
if (matchMode.equals(IntervalMode.OVERLAPS)) {
sb.append(StringUtils.format(
" OR (start = '%s' AND \"end\" = '%s')", Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd()
));
}
sb.append(")");
return sb.toString();
}
/**
* Bind the supplied {@code intervals} to {@code query}.
* @see #getConditionForIntervalsAndMatchMode(Collection, IntervalMode, String)
*/
public static void bindIntervalsToQuery(final Query<Map<String, Object>> query, final Collection<Interval> intervals)
{
if (intervals.isEmpty()) {
return;
}
final Iterator<Interval> iterator = intervals.iterator();
for (int i = 0; iterator.hasNext(); i++) {
Interval interval = iterator.next();
query.bind(StringUtils.format("start%d", i), interval.getStart().toString())
.bind(StringUtils.format("end%d", i), interval.getEnd().toString());
}
}
private CloseableIterator<DataSegment> retrieveSegments(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
if (versions != null && versions.isEmpty()) {
return CloseableIterators.withEmptyBaggage(Collections.emptyIterator());
}
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;
for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(
dataSource,
intervalList,
versions,
matchMode,
used,
limitPerBatch,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
final List<DataSegment> dataSegments = ImmutableList.copyOf(iterator);
resultingIterators.add(dataSegments.iterator());
if (dataSegments.size() >= limitPerBatch) {
break;
}
limitPerBatch -= dataSegments.size();
} else {
resultingIterators.add(iterator);
}
}
return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
}
}
private CloseableIterator<DataSegmentPlus> retrieveSegmentsPlus(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegmentPlus>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;
for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegmentPlus> iterator = retrieveSegmentsPlusInIntervalsBatch(
dataSource,
intervalList,
versions,
matchMode,
used,
limitPerBatch,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
final List<DataSegmentPlus> dataSegments = ImmutableList.copyOf(iterator);
resultingIterators.add(dataSegments.iterator());
if (dataSegments.size() >= limitPerBatch) {
break;
}
limitPerBatch -= dataSegments.size();
} else {
resultingIterators.add(iterator);
}
}
return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
}
}
private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
dataSource,
intervals,
versions,
matchMode,
used,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime,
false
);
final ResultIterator<DataSegment> resultIterator = getDataSegmentResultIterator(sql);
return filterDataSegmentIteratorByInterval(resultIterator, intervals, matchMode);
}
private UnmodifiableIterator<DataSegmentPlus> retrieveSegmentsPlusInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
dataSource,
intervals,
versions,
matchMode,
used,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime,
true
);
final ResultIterator<DataSegmentPlus> resultIterator = getDataSegmentPlusResultIterator(sql);
return filterDataSegmentPlusIteratorByInterval(resultIterator, intervals, matchMode);
}
private Query<Map<String, Object>> buildSegmentsTableQuery(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final List<String> versions,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime,
final boolean includeExtraInfo
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
final StringBuilder sb = new StringBuilder();
if (includeExtraInfo) {
sb.append("SELECT payload, created_date, used_status_last_updated FROM %s WHERE used = :used AND dataSource = :dataSource");
} else {
sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource");
}
if (compareAsString) {
sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString()));
}
if (versions != null) {
sb.append(getParameterizedInConditionForColumn("version", versions));
}
// Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null.
final boolean addMaxUsedLastUpdatedTimeFilter = !used && maxUsedStatusLastUpdatedTime != null;
if (addMaxUsedLastUpdatedTimeFilter) {
sb.append(" AND (used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated)");
}
if (lastSegmentId != null) {
sb.append(
StringUtils.format(
" AND id %s :id",
(sortOrder == null || sortOrder == SortOrder.ASC)
? ">"
: "<"
)
);
}
if (sortOrder != null) {
sb.append(StringUtils.format(" ORDER BY id %2$s, start %2$s, %1$send%1$s %2$s",
connector.getQuoteString(),
sortOrder.toString()));
}
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(
sb.toString(),
dbTables.getSegmentsTable()
))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
if (addMaxUsedLastUpdatedTimeFilter) {
sql.bind("used_status_last_updated", maxUsedStatusLastUpdatedTime.toString());
}
if (lastSegmentId != null) {
sql.bind("id", lastSegmentId);
}
if (limit != null) {
sql.setMaxRows(limit);
}
if (compareAsString) {
bindIntervalsToQuery(sql, intervals);
}
if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, sql);
}
return sql;
}
private ResultIterator<DataSegment> getDataSegmentResultIterator(Query<Map<String, Object>> sql)
{
return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
}
private ResultIterator<DataSegmentPlus> getDataSegmentPlusResultIterator(Query<Map<String, Object>> sql)
{
return sql.map((index, r, ctx) -> new DataSegmentPlus(
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
DateTimes.of(r.getString(2)),
DateTimes.of(r.getString(3)),
null,
null,
null
))
.iterator();
}
private UnmodifiableIterator<DataSegment> filterDataSegmentIteratorByInterval(
ResultIterator<DataSegment> resultIterator,
final Collection<Interval> intervals,
final IntervalMode matchMode
)
{
return Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
}
}
return false;
}
}
);
}
private UnmodifiableIterator<DataSegmentPlus> filterDataSegmentPlusIteratorByInterval(
ResultIterator<DataSegmentPlus> resultIterator,
final Collection<Interval> intervals,
final IntervalMode matchMode
)
{
return Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getDataSegment().getInterval())) {
return true;
}
}
return false;
}
}
);
}
private static int computeNumChangedSegments(List<String> segmentIds, int[] segmentChanges)
{
int numChangedSegments = 0;
for (int i = 0; i < segmentChanges.length; i++) {
int numUpdatedRows = segmentChanges[i];
if (numUpdatedRows < 0) {
log.error(
"ASSERTION_ERROR: Negative number of rows updated for segment id [%s]: %d",
segmentIds.get(i),
numUpdatedRows
);
} else if (numUpdatedRows > 1) {
log.error(
"More than one row updated for segment id [%s]: %d, "
+ "there may be more than one row for the segment id in the database",
segmentIds.get(i),
numUpdatedRows
);
}
if (numUpdatedRows > 0) {
numChangedSegments += 1;
}
}
return numChangedSegments;
}
/**
* @return a parameterized {@code IN} clause for the specified {@code columnName}. The column values need to be bound
* to a query by calling {@link #bindColumnValuesToQueryWithInCondition(String, List, SQLStatement)}.
*
* @implNote JDBI 3.x has better support for binding {@code IN} clauses directly.
*/
private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
{
if (values == null) {
return "";
}
final StringBuilder sb = new StringBuilder();
sb.append(StringUtils.format(" AND %s IN (", columnName));
for (int i = 0; i < values.size(); i++) {
sb.append(StringUtils.format(":%s%d", columnName, i));
if (i != values.size() - 1) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}
/**
* Binds the provided list of {@code values} to the specified {@code columnName} in the given SQL {@code query} that
* contains an {@code IN} clause.
*
* @see #getParameterizedInConditionForColumn(String, List)
*/
private static void bindColumnValuesToQueryWithInCondition(
final String columnName,
final List<String> values,
final SQLStatement<?> query
)
{
if (values == null) {
return;
}
for (int i = 0; i < values.size(); i++) {
query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
}
}
enum IntervalMode
{
CONTAINS {
@Override
public String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder)
{
// 2 range conditions are used on different columns, but not all SQL databases properly optimize it.
// Some databases can only use an index on one of the columns. An additional condition provides
// explicit knowledge that 'start' cannot be greater than 'end'.
return StringUtils.format(
"(start >= %2$s and start <= %3$s and %1$send%1$s <= %3$s)",
quoteString,
startPlaceholder,
endPlaceholder
);
}
@Override
public boolean apply(Interval a, Interval b)
{
return a.contains(b);
}
},
OVERLAPS {
@Override
public String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder)
{
return StringUtils.format(
"(start < %3$s AND %1$send%1$s > %2$s)",
quoteString,
startPlaceholder,
endPlaceholder
);
}
@Override
public boolean apply(Interval a, Interval b)
{
return a.overlaps(b);
}
};
public abstract String makeSqlCondition(String quoteString, String startPlaceholder, String endPlaceholder);
public abstract boolean apply(Interval a, Interval b);
}
}