| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg.flink; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.flink.table.api.DataTypes; |
| import org.apache.flink.table.api.TableSchema; |
| import org.apache.flink.table.data.GenericRowData; |
| import org.apache.flink.table.data.RowData; |
| import org.apache.flink.table.data.StringData; |
| import org.apache.flink.table.types.logical.RowType; |
| import org.apache.flink.types.RowKind; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.iceberg.DataFile; |
| import org.apache.iceberg.DataFiles; |
| import org.apache.iceberg.DeleteFile; |
| import org.apache.iceberg.FileFormat; |
| import org.apache.iceberg.PartitionSpec; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.Table; |
| import org.apache.iceberg.data.GenericRecord; |
| import org.apache.iceberg.data.IcebergGenerics; |
| import org.apache.iceberg.data.Record; |
| import org.apache.iceberg.deletes.EqualityDeleteWriter; |
| import org.apache.iceberg.deletes.PositionDeleteWriter; |
| import org.apache.iceberg.encryption.EncryptedOutputFile; |
| import org.apache.iceberg.flink.sink.FlinkAppenderFactory; |
| import org.apache.iceberg.hadoop.HadoopInputFile; |
| import org.apache.iceberg.hadoop.HadoopTables; |
| import org.apache.iceberg.io.CloseableIterable; |
| import org.apache.iceberg.io.FileAppender; |
| import org.apache.iceberg.io.FileAppenderFactory; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.types.Types; |
| import org.apache.iceberg.util.Pair; |
| import org.apache.iceberg.util.StructLikeSet; |
| import org.junit.Assert; |
| |
| import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; |
| |
| public class SimpleDataUtil { |
| |
| private SimpleDataUtil() { |
| } |
| |
| public static final Schema SCHEMA = new Schema( |
| Types.NestedField.optional(1, "id", Types.IntegerType.get()), |
| Types.NestedField.optional(2, "data", Types.StringType.get()) |
| ); |
| |
| public static final TableSchema FLINK_SCHEMA = TableSchema.builder() |
| .field("id", DataTypes.INT()) |
| .field("data", DataTypes.STRING()) |
| .build(); |
| |
| public static final RowType ROW_TYPE = (RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(); |
| |
| public static final Record RECORD = GenericRecord.create(SCHEMA); |
| |
| public static Table createTable(String path, Map<String, String> properties, boolean partitioned) { |
| PartitionSpec spec; |
| if (partitioned) { |
| spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); |
| } else { |
| spec = PartitionSpec.unpartitioned(); |
| } |
| return new HadoopTables().create(SCHEMA, spec, properties, path); |
| } |
| |
| public static Record createRecord(Integer id, String data) { |
| Record record = RECORD.copy(); |
| record.setField("id", id); |
| record.setField("data", data); |
| return record; |
| } |
| |
| public static RowData createRowData(Integer id, String data) { |
| return GenericRowData.of(id, StringData.fromString(data)); |
| } |
| |
| public static RowData createInsert(Integer id, String data) { |
| return GenericRowData.ofKind(RowKind.INSERT, id, StringData.fromString(data)); |
| } |
| |
| public static RowData createDelete(Integer id, String data) { |
| return GenericRowData.ofKind(RowKind.DELETE, id, StringData.fromString(data)); |
| } |
| |
| public static RowData createUpdateBefore(Integer id, String data) { |
| return GenericRowData.ofKind(RowKind.UPDATE_BEFORE, id, StringData.fromString(data)); |
| } |
| |
| public static RowData createUpdateAfter(Integer id, String data) { |
| return GenericRowData.ofKind(RowKind.UPDATE_AFTER, id, StringData.fromString(data)); |
| } |
| |
| public static DataFile writeFile(Schema schema, PartitionSpec spec, Configuration conf, |
| String location, String filename, List<RowData> rows) |
| throws IOException { |
| Path path = new Path(location, filename); |
| FileFormat fileFormat = FileFormat.fromFileName(filename); |
| Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); |
| |
| RowType flinkSchema = FlinkSchemaUtil.convert(schema); |
| FileAppenderFactory<RowData> appenderFactory = |
| new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); |
| |
| FileAppender<RowData> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); |
| try (FileAppender<RowData> closeableAppender = appender) { |
| closeableAppender.addAll(rows); |
| } |
| |
| return DataFiles.builder(spec) |
| .withInputFile(HadoopInputFile.fromPath(path, conf)) |
| .withMetrics(appender.metrics()) |
| .build(); |
| } |
| |
| public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String tablePath, String filename, |
| FileAppenderFactory<RowData> appenderFactory, |
| List<RowData> deletes) throws IOException { |
| EncryptedOutputFile outputFile = |
| table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); |
| |
| EqualityDeleteWriter<RowData> eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null); |
| try (EqualityDeleteWriter<RowData> writer = eqWriter) { |
| writer.deleteAll(deletes); |
| } |
| return eqWriter.toDeleteFile(); |
| } |
| |
| public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String tablePath, |
| String filename, |
| FileAppenderFactory<RowData> appenderFactory, |
| List<Pair<CharSequence, Long>> positions) throws IOException { |
| EncryptedOutputFile outputFile = |
| table.encryption().encrypt(fromPath(new Path(tablePath, filename), new Configuration())); |
| |
| PositionDeleteWriter<RowData> posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null); |
| try (PositionDeleteWriter<RowData> writer = posWriter) { |
| for (Pair<CharSequence, Long> p : positions) { |
| writer.delete(p.first(), p.second()); |
| } |
| } |
| return posWriter.toDeleteFile(); |
| } |
| |
| private static List<Record> convertToRecords(List<RowData> rows) { |
| List<Record> records = Lists.newArrayList(); |
| for (RowData row : rows) { |
| Integer id = row.isNullAt(0) ? null : row.getInt(0); |
| String data = row.isNullAt(1) ? null : row.getString(1).toString(); |
| records.add(createRecord(id, data)); |
| } |
| return records; |
| } |
| |
| public static void assertTableRows(String tablePath, List<RowData> expected) throws IOException { |
| assertTableRecords(tablePath, convertToRecords(expected)); |
| } |
| |
| public static void assertTableRows(Table table, List<RowData> expected) throws IOException { |
| assertTableRecords(table, convertToRecords(expected)); |
| } |
| |
| public static void assertTableRecords(Table table, List<Record> expected) throws IOException { |
| table.refresh(); |
| try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) { |
| Assert.assertEquals("Should produce the expected record", |
| HashMultiset.create(expected), HashMultiset.create(iterable)); |
| } |
| } |
| |
| public static void assertTableRecords(String tablePath, List<Record> expected) throws IOException { |
| Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); |
| assertTableRecords(new HadoopTables().load(tablePath), expected); |
| } |
| |
| public static StructLikeSet expectedRowSet(Table table, Record... records) { |
| StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); |
| Collections.addAll(set, records); |
| return set; |
| } |
| |
| public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException { |
| table.refresh(); |
| StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); |
| try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) { |
| reader.forEach(set::add); |
| } |
| return set; |
| } |
| } |