blob: ccf5e4c5421bebcb3070918b3a71eda9a9c5b9a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.plumber;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
{
private static final IncrementalIndexAddResult ALREADY_SWAPPED =
new IncrementalIndexAddResult(-1, -1, "write after index swapped");
private final Object hydrantLock = new Object();
private final Interval interval;
private final DataSchema schema;
private final ShardSpec shardSpec;
private final String version;
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
private final String dedupColumn;
private final Set<Long> dedupSet = new HashSet<>();
private volatile FireHydrant currHydrant;
private volatile boolean writable = true;
public Sink(
Interval interval,
DataSchema schema,
ShardSpec shardSpec,
String version,
AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
String dedupColumn
)
{
this(
interval,
schema,
shardSpec,
version,
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
dedupColumn,
Collections.emptyList()
);
}
public Sink(
Interval interval,
DataSchema schema,
ShardSpec shardSpec,
String version,
AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
String dedupColumn,
List<FireHydrant> hydrants
)
{
this.schema = schema;
this.shardSpec = shardSpec;
this.interval = interval;
this.version = version;
this.appendableIndexSpec = appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.dedupColumn = dedupColumn;
int maxCount = -1;
for (int i = 0; i < hydrants.size(); ++i) {
final FireHydrant hydrant = hydrants.get(i);
if (hydrant.getCount() <= maxCount) {
throw new ISE("hydrant[%s] not the right count[%s]", hydrant, i);
}
maxCount = hydrant.getCount();
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
try {
numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows());
}
finally {
segment.decrement();
}
}
this.hydrants.addAll(hydrants);
makeNewCurrIndex(interval.getStartMillis(), schema);
}
public void clearDedupCache()
{
dedupSet.clear();
}
public Interval getInterval()
{
return interval;
}
public FireHydrant getCurrHydrant()
{
return currHydrant;
}
public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
{
if (currHydrant == null) {
throw new IAE("No currHydrant but given row[%s]", row);
}
synchronized (hydrantLock) {
if (!writable) {
return Plumber.NOT_WRITABLE;
}
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return ALREADY_SWAPPED; // the hydrant was swapped without being replaced
}
if (checkInDedupSet(row)) {
return Plumber.DUPLICATE;
}
return index.add(row, skipMaxRowsInMemoryCheck);
}
}
public boolean canAppendRow()
{
synchronized (hydrantLock) {
return writable && currHydrant != null && currHydrant.getIndex().canAppendRow();
}
}
public boolean isEmpty()
{
synchronized (hydrantLock) {
return hydrants.size() == 1 && currHydrant.getIndex().isEmpty();
}
}
public boolean isWritable()
{
return writable;
}
/**
* If currHydrant is A, creates a new index B, sets currHydrant to B and returns A.
*
* @return the current index after swapping in a new one
*/
public FireHydrant swap()
{
return makeNewCurrIndex(interval.getStartMillis(), schema);
}
public boolean swappable()
{
synchronized (hydrantLock) {
return writable && currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0;
}
}
public boolean finished()
{
return !writable;
}
/**
* Marks sink as 'finished', preventing further writes.
* @return 'true' if sink was sucessfully finished, 'false' if sink was already finished
*/
public boolean finishWriting()
{
synchronized (hydrantLock) {
if (!writable) {
return false;
}
writable = false;
clearDedupCache();
}
return true;
}
public DataSegment getSegment()
{
return new DataSegment(
schema.getDataSource(),
interval,
version,
ImmutableMap.of(),
Collections.emptyList(),
Lists.transform(Arrays.asList(schema.getAggregators()), AggregatorFactory::getName),
shardSpec,
null,
0
);
}
public int getNumRows()
{
synchronized (hydrantLock) {
return numRowsExcludingCurrIndex.get() + getNumRowsInMemory();
}
}
public int getNumRowsInMemory()
{
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return 0;
}
return currHydrant.getIndex().size();
}
}
public long getBytesInMemory()
{
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return 0;
}
return currHydrant.getIndex().getBytesInMemory().get();
}
}
private boolean checkInDedupSet(InputRow row)
{
if (dedupColumn != null) {
Object value = row.getRaw(dedupColumn);
if (value != null) {
if (value instanceof List) {
throw new IAE("Dedup on multi-value field not support");
}
Long pk;
if (value instanceof Long || value instanceof Integer) {
pk = ((Number) value).longValue();
} else {
// use long type hashcode to reduce heap cost.
// maybe hash collision, but it's more important to avoid OOM
pk = pkHash(String.valueOf(value));
}
if (dedupSet.contains(pk)) {
return true;
}
dedupSet.add(pk);
}
}
return false;
}
private long pkHash(String s)
{
long seed = 131; // 31 131 1313 13131 131313 etc.. BKDRHash
long hash = 0;
for (int i = 0; i < s.length(); i++) {
hash = (hash * seed) + s.charAt(i);
}
return hash;
}
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withTimestampSpec(schema.getTimestampSpec())
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withDimensionsSpec(schema.getDimensionsSpec())
.withMetrics(schema.getAggregators())
.withRollup(schema.getGranularitySpec().isRollup())
.build();
// Build the incremental-index according to the spec that was chosen by the user
final IncrementalIndex newIndex = appendableIndexSpec.builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
.build();
final FireHydrant old;
synchronized (hydrantLock) {
if (writable) {
old = currHydrant;
int newCount = 0;
int numHydrants = hydrants.size();
if (numHydrants > 0) {
FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
newCount = lastHydrant.getCount() + 1;
if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) {
Map<String, ColumnCapabilities> oldCapabilities;
if (lastHydrant.hasSwapped()) {
oldCapabilities = new HashMap<>();
ReferenceCountingSegment segment = lastHydrant.getIncrementedSegment();
try {
QueryableIndex oldIndex = segment.asQueryableIndex();
for (String dim : oldIndex.getAvailableDimensions()) {
dimOrder.add(dim);
oldCapabilities.put(dim, oldIndex.getColumnHolder(dim).getCapabilities());
}
}
finally {
segment.decrement();
}
} else {
IncrementalIndex oldIndex = lastHydrant.getIndex();
dimOrder.addAll(oldIndex.getDimensionOrder());
oldCapabilities = oldIndex.getColumnCapabilities();
}
newIndex.loadDimensionIterable(dimOrder, oldCapabilities);
}
}
currHydrant = new FireHydrant(newIndex, newCount, getSegment().getId());
if (old != null) {
numRowsExcludingCurrIndex.addAndGet(old.getIndex().size());
}
hydrants.add(currHydrant);
} else {
// Oops, someone called finishWriting while we were making this new index.
newIndex.close();
throw new ISE("finishWriting() called during swap");
}
}
return old;
}
@Override
public Iterator<FireHydrant> iterator()
{
return Iterators.filter(
hydrants.iterator(),
new Predicate<FireHydrant>()
{
@Override
public boolean apply(FireHydrant input)
{
final IncrementalIndex index = input.getIndex();
return index == null || index.size() != 0;
}
}
);
}
@Override
public String toString()
{
return "Sink{" +
"interval=" + interval +
", schema=" + schema +
'}';
}
@Override
public boolean overshadows(Sink other)
{
// Sink is currently used in timeline only for querying stream data.
// In this case, sinks never overshadow each other.
return false;
}
@Override
public int getStartRootPartitionId()
{
return shardSpec.getStartRootPartitionId();
}
@Override
public int getEndRootPartitionId()
{
return shardSpec.getEndRootPartitionId();
}
@Override
public String getVersion()
{
return version;
}
@Override
public short getMinorVersion()
{
return shardSpec.getMinorVersion();
}
@Override
public short getAtomicUpdateGroupSize()
{
return shardSpec.getAtomicUpdateGroupSize();
}
}