blob: e0b3dd6ff17a940a6fa325b8ad7680fdf5106ea4 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public abstract class MergeTaskBase extends AbstractTask
{
private final List<DataSegment> segments;
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
protected MergeTaskBase(final String dataSource, final List<DataSegment> segments)
{
super(
// _not_ the version, just something uniqueish
String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()),
dataSource,
computeMergedInterval(segments)
);
// Verify segment list is nonempty
Preconditions.checkArgument(segments.size() > 0, "segments nonempty");
// Verify segments are all in the correct datasource
Preconditions.checkArgument(
Iterables.size(
Iterables.filter(
segments,
new Predicate<DataSegment>()
{
@Override
public boolean apply(@Nullable DataSegment segment)
{
return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
}
}
)
) == 0, "segments in the wrong datasource"
);
// Verify segments are all unsharded
Preconditions.checkArgument(
Iterables.size(
Iterables.filter(
segments,
new Predicate<DataSegment>()
{
@Override
public boolean apply(@Nullable DataSegment segment)
{
return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec);
}
}
)
) == 0, "segments without NoneShardSpec"
);
this.segments = segments;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
final File taskDir = toolbox.getTaskDir();
try {
final long startTime = System.currentTimeMillis();
log.info(
"Starting merge of id[%s], segments: %s",
getId(),
Lists.transform(
segments,
new Function<DataSegment, String>()
{
@Override
public String apply(@Nullable DataSegment input)
{
return input.getIdentifier();
}
}
)
);
// download segments to merge
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
emitter.emit(builder.build("merger/numMerged", segments.size()));
emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime));
log.info(
"[%s] : Merged %d segments in %,d millis",
mergedSegment.getDataSource(),
segments.size(),
System.currentTimeMillis() - startTime
);
long uploadStart = System.currentTimeMillis();
// Upload file
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment);
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}
catch (Exception e) {
log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource())
.addData("interval", mergedSegment.getInterval())
.emit();
return TaskStatus.failure(getId());
}
}
/**
* Checks pre-existing segments in "context" to confirm that this merge query is valid. Specifically, confirm that
* we are operating on every segment that overlaps the chosen interval.
*/
@Override
public TaskStatus preflight(TaskToolbox toolbox)
{
final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment dataSegment)
{
return dataSegment.getIdentifier();
}
};
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier)
);
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
final Set<String> missingFromRequested = Sets.difference(current, requested);
if (!missingFromRequested.isEmpty()) {
throw new ISE(
"Merge is invalid: current segment(s) are not in the requested set: %s",
Joiner.on(", ").join(missingFromRequested)
);
}
final Set<String> missingFromCurrent = Sets.difference(requested, current);
if (!missingFromCurrent.isEmpty()) {
throw new ISE(
"Merge is invalid: requested segment(s) are not in the current set: %s",
Joiner.on(", ").join(missingFromCurrent)
);
}
return TaskStatus.running(getId());
}
protected abstract File merge(Map<DataSegment, File> segments, File outDir)
throws Exception;
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", getId())
.add("dataSource", getDataSource())
.add("interval", getImplicitLockInterval())
.add("segments", segments)
.toString();
}
private static String computeProcessingID(final String dataSource, final List<DataSegment> segments)
{
final String segmentIDs = Joiner.on("_").join(
Iterables.transform(
Ordering.natural().sortedCopy(segments), new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment x)
{
return String.format(
"%s_%s_%s_%s",
x.getInterval().getStart(),
x.getInterval().getEnd(),
x.getVersion(),
x.getShardSpec().getPartitionNum()
);
}
}
)
);
return String.format(
"%s_%s",
dataSource,
Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase()
);
}
private static Interval computeMergedInterval(final List<DataSegment> segments)
{
Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0");
DateTime start = null;
DateTime end = null;
for(final DataSegment segment : segments) {
if(start == null || segment.getInterval().getStart().isBefore(start)) {
start = segment.getInterval().getStart();
}
if(end == null || segment.getInterval().getEnd().isAfter(end)) {
end = segment.getInterval().getEnd();
}
}
return new Interval(start, end);
}
private static DataSegment computeMergedSegment(
final String dataSource,
final String version,
final List<DataSegment> segments
)
{
final Interval mergedInterval = computeMergedInterval(segments);
final Set<String> mergedDimensions = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
final Set<String> mergedMetrics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (DataSegment segment : segments) {
mergedDimensions.addAll(segment.getDimensions());
mergedMetrics.addAll(segment.getMetrics());
}
return DataSegment.builder()
.dataSource(dataSource)
.interval(mergedInterval)
.version(version)
.shardSpec(new NoneShardSpec())
.dimensions(Lists.newArrayList(mergedDimensions))
.metrics(Lists.newArrayList(mergedMetrics))
.build();
}
}