blob: 1794e8aeaed5b6c5a81663c1e135bcd444c3aa2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.operation;
import org.apache.paimon.KeyValue;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link MergeFileSplitRead}. */
public class MergeFileSplitReadTest {
@TempDir java.nio.file.Path tempDir;
@Test
public void testKeyProjection() throws Exception {
// (a, b, c) -> (b, a), c is the partition, all integers are in range [0, 2]
ThreadLocalRandom random = ThreadLocalRandom.current();
int numRecords = random.nextInt(1000) + 1;
List<KeyValue> data = new ArrayList<>();
Map<Integer, Long> expected = new HashMap<>();
for (int i = 0; i < numRecords; i++) {
int a = random.nextInt(3);
int b = random.nextInt(3);
int c = random.nextInt(3);
long delta = random.nextLong(21) - 10;
// count number of occurrence of (b, a)
expected.compute(b * 10 + a, (k, v) -> v == null ? delta : v + delta);
data.add(
new KeyValue()
.replace(
GenericRow.of(a, b, c),
i,
RowKind.INSERT,
GenericRow.of(delta)));
}
// remove zero occurrence, it might be merged and discarded by the merge tree
expected.entrySet().removeIf(e -> e.getValue() == 0);
RowType partitionType = RowType.of(new DataType[] {new IntType(false)}, new String[] {"c"});
InternalRowSerializer partitionSerializer = new InternalRowSerializer(partitionType);
List<String> keyNames = Arrays.asList("a", "b", "c");
RowType keyType =
RowType.of(
new DataType[] {new IntType(false), new IntType(false), new IntType(false)},
keyNames.toArray(new String[0]));
RowType projectedKeyType = RowType.of(new IntType(false), new IntType(false));
InternalRowSerializer projectedKeySerializer = new InternalRowSerializer(projectedKeyType);
RowType valueType =
RowType.of(new DataType[] {new BigIntType(false)}, new String[] {"count"});
InternalRowSerializer valueSerializer = new InternalRowSerializer(valueType);
TestFileStore store =
createStore(
partitionType,
keyType,
valueType,
new KeyValueFieldsExtractor() {
private static final long serialVersionUID = 1L;
@Override
public List<DataField> keyFields(TableSchema schema) {
return schema.fields().stream()
.filter(f -> keyNames.contains(f.name()))
.collect(Collectors.toList());
}
@Override
public List<DataField> valueFields(TableSchema schema) {
return Collections.singletonList(
new DataField(
0,
"count",
new org.apache.paimon.types.BigIntType()));
}
},
TestValueCountMergeFunction.factory());
List<KeyValue> readData =
writeThenRead(
data,
new int[][] {new int[] {1}, new int[] {0}},
null,
projectedKeySerializer,
valueSerializer,
store,
kv ->
partitionSerializer
.toBinaryRow(GenericRow.of(kv.key().getInt(2)))
.copy());
Map<Integer, Long> actual = new HashMap<>();
for (KeyValue kv : readData) {
assertThat(kv.key().getFieldCount()).isEqualTo(2);
int key = kv.key().getInt(0) * 10 + kv.key().getInt(1);
long delta = kv.value().getLong(0);
actual.compute(key, (k, v) -> v == null ? delta : v + delta);
}
actual.entrySet().removeIf(e -> e.getValue() == 0);
assertThat(actual).isEqualTo(expected);
}
@Test
public void testValueProjection() throws Exception {
// (dt, hr, shopId, orderId, itemId, priceAmount, comment) -> (shopId, itemId, dt, hr)
TestKeyValueGenerator gen = new TestKeyValueGenerator();
int numRecords = ThreadLocalRandom.current().nextInt(1000) + 1;
List<KeyValue> data = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
data.add(gen.next());
}
TestFileStore store =
createStore(
TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
DeduplicateMergeFunction.factory());
InternalRowSerializer projectedValueSerializer =
new InternalRowSerializer(
new IntType(false),
new BigIntType(),
new VarCharType(false, 8),
new IntType(false));
Map<BinaryRow, BinaryRow> expected = store.toKvMap(data);
expected.replaceAll(
(k, v) ->
projectedValueSerializer
.toBinaryRow(
GenericRow.of(
v.getInt(2),
v.isNullAt(4) ? null : v.getLong(4),
v.getString(0),
v.getInt(1)))
.copy());
List<KeyValue> readData =
writeThenRead(
data,
null,
new int[][] {new int[] {2}, new int[] {4}, new int[] {0}, new int[] {1}},
TestKeyValueGenerator.KEY_SERIALIZER,
projectedValueSerializer,
store,
gen::getPartition);
for (KeyValue kv : readData) {
assertThat(kv.value().getFieldCount()).isEqualTo(4);
BinaryRow key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key());
BinaryRow value = projectedValueSerializer.toBinaryRow(kv.value());
assertThat(expected).containsKey(key);
assertThat(value).isEqualTo(expected.get(key));
}
}
private List<KeyValue> writeThenRead(
List<KeyValue> data,
int[][] keyProjection,
int[][] valueProjection,
InternalRowSerializer projectedKeySerializer,
InternalRowSerializer projectedValueSerializer,
TestFileStore store,
Function<KeyValue, BinaryRow> partitionCalculator)
throws Exception {
store.commitData(data, partitionCalculator, kv -> 0);
FileStoreScan scan = store.newScan();
Long snapshotId = store.snapshotManager().latestSnapshotId();
Map<BinaryRow, List<ManifestEntry>> filesGroupedByPartition =
scan.withSnapshot(snapshotId).plan().files().stream()
.collect(Collectors.groupingBy(ManifestEntry::partition));
MergeFileSplitRead read = store.newRead();
if (keyProjection != null) {
read.withKeyProjection(keyProjection);
}
if (valueProjection != null) {
read.withProjection(valueProjection);
}
List<KeyValue> result = new ArrayList<>();
for (Map.Entry<BinaryRow, List<ManifestEntry>> entry : filesGroupedByPartition.entrySet()) {
RecordReader<KeyValue> reader =
read.createReader(
DataSplit.builder()
.withSnapshot(snapshotId)
.withPartition(entry.getKey())
.withBucket(0)
.withDataFiles(
entry.getValue().stream()
.map(ManifestEntry::file)
.collect(Collectors.toList()))
.withBucketPath("not used")
.build());
RecordReaderIterator<KeyValue> actualIterator = new RecordReaderIterator<>(reader);
while (actualIterator.hasNext()) {
result.add(
actualIterator
.next()
.copy(projectedKeySerializer, projectedValueSerializer));
}
}
return result;
}
private TestFileStore createStore(
RowType partitionType,
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor extractor,
MergeFunctionFactory<KeyValue> mfFactory)
throws Exception {
Path path = new Path(tempDir.toUri());
SchemaManager schemaManager = new SchemaManager(FileIOFinder.find(path), path);
boolean valueCountMode = mfFactory.create() instanceof TestValueCountMergeFunction;
Schema schema =
new Schema(
(valueCountMode ? keyType : valueType).getFields(),
partitionType.getFieldNames(),
valueCountMode
? Collections.emptyList()
: Stream.concat(
keyType.getFieldNames().stream()
.map(field -> field.replace("key_", "")),
partitionType.getFieldNames().stream())
.collect(Collectors.toList()),
Collections.emptyMap(),
null);
TableSchema tableSchema = schemaManager.createTable(schema);
return new TestFileStore.Builder(
"avro",
tempDir.toString(),
1,
partitionType,
keyType,
valueType,
extractor,
mfFactory,
tableSchema)
.build();
}
private static class TestValueCountMergeFunction implements MergeFunction<KeyValue> {
private KeyValue latestKv;
private long total;
private KeyValue reused;
protected TestValueCountMergeFunction() {}
@Override
public void reset() {
latestKv = null;
total = 0;
}
@Override
public void add(KeyValue kv) {
checkArgument(
kv.valueKind() == RowKind.INSERT,
"In value count mode, only insert records come. This is a bug. Please file an issue.");
latestKv = kv;
total += count(kv.value());
}
@Override
public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
return reused.replace(
latestKv.key(),
latestKv.sequenceNumber(),
RowKind.INSERT,
GenericRow.of(total));
}
private long count(InternalRow value) {
checkArgument(!value.isNullAt(0), "Value count should not be null.");
return value.getLong(0);
}
public static MergeFunctionFactory<KeyValue> factory() {
return new Factory();
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
private static final long serialVersionUID = 1L;
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new TestValueCountMergeFunction();
}
}
}
}