blob: d0452c57a37a4b396f31c74aadc125668d012dfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.nested.CompressedNestedDataComplexColumn;
import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
import org.apache.druid.segment.nested.NestedDataColumnSerializer;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
import org.apache.druid.segment.serde.ComplexColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.IntBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
public class NestedDataColumnMerger implements DimensionMergerV9
{
private static final Logger log = new Logger(NestedDataColumnMerger.class);
public static final Comparator<Pair<Integer, PeekingIterator<Long>>> LONG_MERGING_COMPARATOR =
DictionaryMergingIterator.makePeekingComparator();
public static final Comparator<Pair<Integer, PeekingIterator<Double>>> DOUBLE_MERGING_COMPARATOR =
DictionaryMergingIterator.makePeekingComparator();
private final String name;
private final Closer closer;
private NestedDataColumnSerializer serializer;
public NestedDataColumnMerger(
String name,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ProgressIndicator progressIndicator,
Closer closer
)
{
this.name = name;
this.serializer = new NestedDataColumnSerializer(name, indexSpec, segmentWriteOutMedium, progressIndicator, closer);
this.closer = closer;
}
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws IOException
{
long dimStartTime = System.currentTimeMillis();
int numMergeIndex = 0;
GlobalDictionarySortedCollector sortedLookup = null;
final Indexed[] sortedLookups = new Indexed[adapters.size()];
final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields = new TreeMap<>();
for (int i = 0; i < adapters.size(); i++) {
final IndexableAdapter adapter = adapters.get(i);
final GlobalDictionarySortedCollector dimValues;
if (adapter instanceof IncrementalIndexAdapter) {
dimValues = getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, mergedFields);
} else if (adapter instanceof QueryableIndexIndexableAdapter) {
dimValues = getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, mergedFields);
} else {
throw new ISE("Unable to merge columns of unsupported adapter %s", adapter.getClass());
}
boolean allNulls = allNull(dimValues.getSortedStrings()) &&
allNull(dimValues.getSortedLongs()) &&
allNull(dimValues.getSortedDoubles());
sortedLookup = dimValues;
if (!allNulls) {
sortedLookups[i] = dimValues.getSortedStrings();
sortedLongLookups[i] = dimValues.getSortedLongs();
sortedDoubleLookups[i] = dimValues.getSortedDoubles();
numMergeIndex++;
}
}
serializer.open();
serializer.serializeFields(mergedFields);
int cardinality = 0;
if (numMergeIndex > 1) {
DictionaryMergingIterator<String> dictionaryMergeIterator = new DictionaryMergingIterator<>(
sortedLookups,
StringDimensionMergerV9.DICTIONARY_MERGING_COMPARATOR,
true
);
DictionaryMergingIterator<Long> longDictionaryMergeIterator = new DictionaryMergingIterator<>(
sortedLongLookups,
LONG_MERGING_COMPARATOR,
true
);
DictionaryMergingIterator<Double> doubleDictionaryMergeIterator = new DictionaryMergingIterator<>(
sortedDoubleLookups,
DOUBLE_MERGING_COMPARATOR,
true
);
serializer.serializeStringDictionary(() -> dictionaryMergeIterator);
serializer.serializeLongDictionary(() -> longDictionaryMergeIterator);
serializer.serializeDoubleDictionary(() -> doubleDictionaryMergeIterator);
cardinality = dictionaryMergeIterator.getCardinality();
} else if (numMergeIndex == 1) {
serializer.serializeStringDictionary(sortedLookup.getSortedStrings());
serializer.serializeLongDictionary(sortedLookup.getSortedLongs());
serializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles());
cardinality = sortedLookup.size();
}
log.debug(
"Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
name,
cardinality,
System.currentTimeMillis() - dimStartTime
);
}
@Nullable
private GlobalDictionarySortedCollector getSortedIndexFromIncrementalAdapter(
IncrementalIndexAdapter adapter,
SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields
)
{
final IncrementalIndex index = adapter.getIncrementalIndex();
final IncrementalIndex.DimensionDesc dim = index.getDimension(name);
if (dim == null || !(dim.getIndexer() instanceof NestedDataColumnIndexer)) {
return null;
}
final NestedDataColumnIndexer indexer = (NestedDataColumnIndexer) dim.getIndexer();
for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : indexer.fieldIndexers.entrySet()) {
// skip adding the field if no types are in the set, meaning only null values have been processed
if (!entry.getValue().getTypes().isEmpty()) {
mergedFields.put(entry.getKey(), entry.getValue().getTypes());
}
}
return indexer.globalDictionary.getSortedCollector();
}
@Nullable
private GlobalDictionarySortedCollector getSortedIndexesFromQueryableAdapter(
QueryableIndexIndexableAdapter adapter,
SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields
)
{
final ColumnHolder columnHolder = adapter.getQueryableIndex().getColumnHolder(name);
if (columnHolder == null) {
return null;
}
final BaseColumn col = columnHolder.getColumn();
closer.register(col);
if (col instanceof CompressedNestedDataComplexColumn) {
return getSortedIndexFromV1QueryableAdapter(mergedFields, col);
}
return null;
}
private GlobalDictionarySortedCollector getSortedIndexFromV1QueryableAdapter(
SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields,
BaseColumn col
)
{
@SuppressWarnings("unchecked")
CompressedNestedDataComplexColumn column = (CompressedNestedDataComplexColumn) col;
closer.register(column);
for (int i = 0; i < column.getFields().size(); i++) {
String fieldPath = column.getFields().get(i);
NestedLiteralTypeInfo.TypeSet types = column.getFieldInfo().getTypes(i);
mergedFields.compute(fieldPath, (k, v) -> {
if (v == null) {
return new NestedLiteralTypeInfo.MutableTypeSet(types.getByteValue());
}
return v.merge(types.getByteValue());
});
}
return new GlobalDictionarySortedCollector(
column.getStringDictionary(),
column.getLongDictionary(),
column.getDoubleDictionary()
);
}
@Override
public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues(
int segmentIndex,
ColumnValueSelector source
)
{
return source;
}
@Override
public void processMergedRow(ColumnValueSelector selector) throws IOException
{
serializer.serialize(selector);
}
@Override
public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions)
{
// fields write their own indexes
}
@Override
public boolean hasOnlyNulls()
{
return false;
}
@Override
public ColumnDescriptor makeColumnDescriptor()
{
return new ColumnDescriptor.Builder()
.setValueType(ValueType.COMPLEX)
.setHasMultipleValues(false)
.addSerde(ComplexColumnPartSerde.serializerBuilder()
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
.withDelegate(serializer)
.build()
)
.build();
}
private <T> boolean allNull(Indexed<T> dimValues)
{
for (int i = 0, size = dimValues.size(); i < size; i++) {
if (dimValues.get(i) != null) {
return false;
}
}
return true;
}
}