blob: 1767f774922aab4caf9374dc33886bcedcc62135 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;
import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructLikeWrapper;
import org.awaitility.Awaitility;
public class SimpleDataUtil {
private SimpleDataUtil() {}
public static final Schema SCHEMA =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));
public static final TableSchema FLINK_SCHEMA =
TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build();
public static final RowType ROW_TYPE = (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType();
public static final Record RECORD = GenericRecord.create(SCHEMA);
public static Table createTable(
String path, Map<String, String> properties, boolean partitioned) {
PartitionSpec spec;
if (partitioned) {
spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
} else {
spec = PartitionSpec.unpartitioned();
}
return new HadoopTables().create(SCHEMA, spec, properties, path);
}
public static Record createRecord(Integer id, String data) {
Record record = RECORD.copy();
record.setField("id", id);
record.setField("data", data);
return record;
}
public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
public static RowData createInsert(Integer id, String data) {
return GenericRowData.ofKind(RowKind.INSERT, id, StringData.fromString(data));
}
public static RowData createDelete(Integer id, String data) {
return GenericRowData.ofKind(RowKind.DELETE, id, StringData.fromString(data));
}
public static RowData createUpdateBefore(Integer id, String data) {
return GenericRowData.ofKind(RowKind.UPDATE_BEFORE, id, StringData.fromString(data));
}
public static RowData createUpdateAfter(Integer id, String data) {
return GenericRowData.ofKind(RowKind.UPDATE_AFTER, id, StringData.fromString(data));
}
public static DataFile writeFile(
Table table,
Schema schema,
PartitionSpec spec,
Configuration conf,
String location,
String filename,
List<RowData> rows)
throws IOException {
return writeFile(table, schema, spec, conf, location, filename, rows, null);
}
/** Write the list of {@link RowData} to the given path and with the given partition data */
public static DataFile writeFile(
Table table,
Schema schema,
PartitionSpec spec,
Configuration conf,
String location,
String filename,
List<RowData> rows,
StructLike partition)
throws IOException {
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
FileAppenderFactory<RowData> appenderFactory =
new FlinkAppenderFactory(
table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null);
FileAppender<RowData> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat);
try (FileAppender<RowData> closeableAppender = appender) {
closeableAppender.addAll(rows);
}
DataFiles.Builder builder =
DataFiles.builder(spec)
.withInputFile(HadoopInputFile.fromPath(path, conf))
.withMetrics(appender.metrics());
if (partition != null) {
builder = builder.withPartition(partition);
}
return builder.build();
}
public static DeleteFile writeEqDeleteFile(
Table table,
FileFormat format,
String filename,
FileAppenderFactory<RowData> appenderFactory,
List<RowData> deletes)
throws IOException {
EncryptedOutputFile outputFile =
table
.encryption()
.encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
EqualityDeleteWriter<RowData> eqWriter =
appenderFactory.newEqDeleteWriter(outputFile, format, null);
try (EqualityDeleteWriter<RowData> writer = eqWriter) {
writer.write(deletes);
}
return eqWriter.toDeleteFile();
}
public static DeleteFile writePosDeleteFile(
Table table,
FileFormat format,
String filename,
FileAppenderFactory<RowData> appenderFactory,
List<Pair<CharSequence, Long>> positions)
throws IOException {
EncryptedOutputFile outputFile =
table
.encryption()
.encrypt(fromPath(new Path(table.location(), filename), new Configuration()));
PositionDeleteWriter<RowData> posWriter =
appenderFactory.newPosDeleteWriter(outputFile, format, null);
PositionDelete<RowData> posDelete = PositionDelete.create();
try (PositionDeleteWriter<RowData> writer = posWriter) {
for (Pair<CharSequence, Long> p : positions) {
writer.write(posDelete.set(p.first(), p.second(), null));
}
}
return posWriter.toDeleteFile();
}
private static List<Record> convertToRecords(List<RowData> rows) {
List<Record> records = Lists.newArrayList();
for (RowData row : rows) {
Integer id = row.isNullAt(0) ? null : row.getInt(0);
String data = row.isNullAt(1) ? null : row.getString(1).toString();
records.add(createRecord(id, data));
}
return records;
}
public static void assertTableRows(String tablePath, List<RowData> expected, String branch)
throws IOException {
assertTableRecords(tablePath, convertToRecords(expected), branch);
}
public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH);
}
public static void assertTableRows(Table table, List<RowData> expected, String branch)
throws IOException {
assertTableRecords(table, convertToRecords(expected), branch);
}
/** Get all rows for a table */
public static List<Record> tableRecords(Table table) throws IOException {
table.refresh();
List<Record> records = Lists.newArrayList();
try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
for (Record record : iterable) {
records.add(record);
}
}
return records;
}
public static boolean equalsRecords(List<Record> expected, List<Record> actual, Schema schema) {
if (expected.size() != actual.size()) {
return false;
}
Types.StructType type = schema.asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
StructLikeSet actualSet = StructLikeSet.create(type);
actualSet.addAll(actual);
return expectedSet.equals(actualSet);
}
public static void assertRecordsEqual(List<Record> expected, List<Record> actual, Schema schema) {
assertThat(actual).hasSameSizeAs(expected);
Types.StructType type = schema.asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
StructLikeSet actualSet = StructLikeSet.create(type);
actualSet.addAll(actual);
assertThat(actualSet).containsExactlyInAnyOrderElementsOf(expectedSet);
}
/**
* Assert table contains the expected list of records after waiting up to the configured {@code
* timeout}
*/
public static void assertTableRecords(Table table, List<Record> expected, Duration timeout) {
Awaitility.await("expected list of records should be produced")
.atMost(timeout)
.untilAsserted(() -> assertRecordsEqual(expected, tableRecords(table), table.schema()));
}
public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
}
public static void assertTableRecords(Table table, List<Record> expected, String branch)
throws IOException {
table.refresh();
Snapshot snapshot = latestSnapshot(table, branch);
if (snapshot == null) {
assertThat(expected).isEmpty();
return;
}
Types.StructType type = table.schema().asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
try (CloseableIterable<Record> iterable =
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
StructLikeSet actualSet = StructLikeSet.create(type);
for (Record record : iterable) {
actualSet.add(record);
}
assertThat(actualSet).containsExactlyInAnyOrderElementsOf(expectedSet);
}
}
// Returns the latest snapshot of the given branch in the table
public static Snapshot latestSnapshot(Table table, String branch) {
// For the main branch, currentSnapshot() is used to validate that the API behavior has
// not changed since that was the API used for validation prior to addition of branches.
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}
return table.snapshot(branch);
}
public static void assertTableRecords(String tablePath, List<Record> expected)
throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH);
}
public static void assertTableRecords(String tablePath, List<Record> expected, String branch)
throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
}
public static StructLikeSet expectedRowSet(Table table, Record... records) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
InternalRecordWrapper wrapper = new InternalRecordWrapper(table.schema().asStruct());
for (Record record : records) {
set.add(wrapper.copyFor(record));
}
return set;
}
public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
return actualRowSet(table, null, columns);
}
public static StructLikeSet actualRowSet(Table table, Long snapshotId, String... columns)
throws IOException {
table.refresh();
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
InternalRecordWrapper wrapper = new InternalRecordWrapper(table.schema().asStruct());
try (CloseableIterable<Record> reader =
IcebergGenerics.read(table)
.useSnapshot(snapshotId == null ? table.currentSnapshot().snapshotId() : snapshotId)
.select(columns)
.build()) {
reader.forEach(record -> set.add(wrapper.copyFor(record)));
}
return set;
}
public static List<DataFile> partitionDataFiles(Table table, Map<String, Object> partitionValues)
throws IOException {
table.refresh();
Types.StructType partitionType = table.spec().partitionType();
Record partitionRecord = GenericRecord.create(partitionType).copy(partitionValues);
StructLikeWrapper expectedWrapper =
StructLikeWrapper.forType(partitionType).set(partitionRecord);
List<DataFile> dataFiles = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
for (FileScanTask scanTask : fileScanTasks) {
StructLikeWrapper wrapper =
StructLikeWrapper.forType(partitionType).set(scanTask.file().partition());
if (expectedWrapper.equals(wrapper)) {
dataFiles.add(scanTask.file());
}
}
}
return dataFiles;
}
public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
table.refresh();
Map<Long, List<DataFile>> result = Maps.newHashMap();
Snapshot current = table.currentSnapshot();
while (current != null) {
TableScan tableScan = table.newScan();
if (current.parentId() != null) {
// Collect the data files that was added only in current snapshot.
tableScan = tableScan.appendsBetween(current.parentId(), current.snapshotId());
} else {
// Collect the data files that was added in the oldest snapshot.
tableScan = tableScan.useSnapshot(current.snapshotId());
}
try (CloseableIterable<FileScanTask> scanTasks = tableScan.planFiles()) {
result.put(
current.snapshotId(),
ImmutableList.copyOf(Iterables.transform(scanTasks, FileScanTask::file)));
}
// Continue to traverse the parent snapshot if exists.
if (current.parentId() == null) {
break;
}
// Iterate to the parent snapshot.
current = table.snapshot(current.parentId());
}
return result;
}
public static List<DataFile> matchingPartitions(
List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) {
Types.StructType partitionType = partitionSpec.partitionType();
Record partitionRecord = GenericRecord.create(partitionType).copy(partitionValues);
StructLikeWrapper expected = StructLikeWrapper.forType(partitionType).set(partitionRecord);
return dataFiles.stream()
.filter(
df -> {
StructLikeWrapper wrapper =
StructLikeWrapper.forType(partitionType).set(df.partition());
return wrapper.equals(expected);
})
.collect(Collectors.toList());
}
}