blob: e7f3d5f41edbde65c46add2bc9dd6332aff54d84 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ordering;
import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class DruidMasterSegmentMerger implements DruidMasterHelper
{
private static final Logger log = new Logger(DruidMasterSegmentMerger.class);
private final MergerClient mergerClient;
public DruidMasterSegmentMerger(MergerClient mergerClient)
{
this.mergerClient = mergerClient;
}
public DruidMasterSegmentMerger(ObjectMapper jsonMapper, ServiceProvider serviceProvider)
{
this.mergerClient = new HttpMergerClient(
HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(),
new Lifecycle()
),
new ToStringResponseHandler(Charsets.UTF_8),
jsonMapper,
serviceProvider
);
}
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MasterStats stats = new MasterStats();
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = Maps.newHashMap();
// Find serviced segments by using a timeline
for (DataSegment dataSegment : params.getAvailableSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(dataSegment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<String, DataSegment>(Ordering.<String>natural());
dataSources.put(dataSegment.getDataSource(), timeline);
}
timeline.add(
dataSegment.getInterval(),
dataSegment.getVersion(),
dataSegment.getShardSpec().createChunk(dataSegment)
);
}
// Find segments to merge
for (final Map.Entry<String, VersionedIntervalTimeline<String, DataSegment>> entry : dataSources.entrySet()) {
// Get serviced segments from the timeline
VersionedIntervalTimeline<String, DataSegment> timeline = entry.getValue();
List<TimelineObjectHolder<String, DataSegment>> timelineObjects =
timeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01")));
// Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set
SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
for(int i = 0 ; i < timelineObjects.size() ; i++) {
segmentsToMerge.add(timelineObjects.get(i));
if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit()
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit())
{
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) {
stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
}
if (segmentsToMerge.getSegmentCount() == 0) {
// Backtracked all the way to zero. Increment by one so we continue to make progress.
i++;
}
segmentsToMerge = new SegmentsToMerge();
}
}
// Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) {
stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
}
}
return params.buildFromExisting()
.withMasterStats(stats)
.build();
}
/**
* Issue merge request for some list of segments.
* @return number of segments merged
*/
private int mergeSegments(SegmentsToMerge segmentsToMerge, String dataSource)
{
final List<DataSegment> segments = segmentsToMerge.getSegments();
final List<String> segmentNames = Lists.transform(
segments,
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment input)
{
return input.getIdentifier();
}
}
);
log.info("[%s] Found %d segments to merge %s", dataSource, segments.size(), segmentNames);
try {
mergerClient.runRequest(dataSource, segments);
}
catch (Exception e) {
log.error(
e,
"[%s] Merging error for segments [%s]",
dataSource,
segmentNames
);
}
return segments.size();
}
private static class SegmentsToMerge
{
// set of already-included segments (to help us keep track of bytes accumulated)
private final Multiset<DataSegment> segments;
// (timeline object, union interval of underlying segments up to this point in the list)
private final List<Pair<TimelineObjectHolder<String, DataSegment>, Interval>> timelineObjects;
private long byteCount;
private SegmentsToMerge()
{
this.timelineObjects = Lists.newArrayList();
this.segments = HashMultiset.create();
this.byteCount = 0;
}
public List<DataSegment> getSegments()
{
return ImmutableSet.copyOf(
FunctionalIterable.create(timelineObjects).transformCat(
new Function<Pair<TimelineObjectHolder<String, DataSegment>, Interval>, Iterable<DataSegment>>()
{
@Override
public Iterable<DataSegment> apply(Pair<TimelineObjectHolder<String, DataSegment>, Interval> input)
{
return Iterables.transform(
input.lhs.getObject(),
new Function<PartitionChunk<DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(PartitionChunk<DataSegment> input)
{
return input.getObject();
}
}
);
}
}
)
).asList();
}
public void add(TimelineObjectHolder<String, DataSegment> timelineObject)
{
final Interval timelineObjectInterval = timelineObject.getInterval();
final Interval underlyingInterval = timelineObject.getObject().getChunk(0).getObject().getInterval();
if (timelineObjects.size() > 0) {
Preconditions.checkArgument(
timelineObjectInterval.getStart().getMillis() >=
timelineObjects.get(timelineObjects.size() - 1).lhs.getInterval().getEnd().getMillis(),
"timeline objects must be provided in order"
);
}
for(final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
segments.add(segment.getObject());
if(segments.count(segment.getObject()) == 1) {
byteCount += segment.getObject().getSize();
}
}
// Compute new underlying merged interval
final Interval mergedUnderlyingInterval = getMergedUnderlyingInterval();
if(mergedUnderlyingInterval == null) {
timelineObjects.add(Pair.of(timelineObject, underlyingInterval));
} else {
final DateTime start = underlyingInterval.getStart().isBefore(mergedUnderlyingInterval.getStart())
? underlyingInterval.getStart()
: mergedUnderlyingInterval.getStart();
final DateTime end = underlyingInterval.getEnd().isAfter(mergedUnderlyingInterval.getEnd())
? underlyingInterval.getEnd()
: mergedUnderlyingInterval.getEnd();
timelineObjects.add(Pair.of(timelineObject, new Interval(start, end)));
}
}
public Interval getMergedTimelineInterval()
{
if(timelineObjects.isEmpty()) {
return null;
} else {
return new Interval(
timelineObjects.get(0).lhs.getInterval().getStart(),
timelineObjects.get(timelineObjects.size() - 1).lhs.getInterval().getEnd()
);
}
}
public Interval getMergedUnderlyingInterval()
{
if(timelineObjects.isEmpty()) {
return null;
} else {
return timelineObjects.get(timelineObjects.size() - 1).rhs;
}
}
public long getByteCount()
{
return byteCount;
}
public int getSegmentCount()
{
return timelineObjects.size();
}
/**
* Does this set of segments fully cover union(all segment intervals)?
* @return true if this set is complete
*/
public boolean isComplete()
{
return timelineObjects.size() == 0 || getMergedTimelineInterval().equals(getMergedUnderlyingInterval());
}
/**
* Remove timelineObjects from this holder until we have a complete set with total size <= maxSize.
* @return number of timeline object holders removed
*/
public int backtrack(long maxSize)
{
Preconditions.checkArgument(maxSize >= 0, "maxSize >= 0");
int removed = 0;
while (!isComplete() || byteCount > maxSize) {
removed ++;
final TimelineObjectHolder<String, DataSegment> removedHolder = timelineObjects.remove(timelineObjects.size() - 1).lhs;
for(final PartitionChunk<DataSegment> segment : removedHolder.getObject()) {
segments.remove(segment.getObject());
if(segments.count(segment.getObject()) == 0) {
byteCount -= segment.getObject().getSize();
}
}
}
return removed;
}
}
}