| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg.flink.source; |
| |
| import java.io.File; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.iceberg.BaseCombinedScanTask; |
| import org.apache.iceberg.FileFormat; |
| import org.apache.iceberg.FileScanTask; |
| import org.apache.iceberg.PartitionSpec; |
| import org.apache.iceberg.Table; |
| import org.apache.iceberg.TableProperties; |
| import org.apache.iceberg.data.GenericAppenderHelper; |
| import org.apache.iceberg.data.RandomGenericData; |
| import org.apache.iceberg.data.Record; |
| import org.apache.iceberg.flink.TestFixtures; |
| import org.apache.iceberg.flink.source.split.IcebergSourceSplit; |
| import org.apache.iceberg.hadoop.HadoopCatalog; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| import org.apache.iceberg.util.ThreadPools; |
| import org.junit.Assert; |
| import org.junit.rules.TemporaryFolder; |
| |
| public class SplitHelpers { |
| |
| private SplitHelpers() {} |
| |
| /** |
| * This create a list of IcebergSourceSplit from real files |
| * <li>Create a new Hadoop table under the {@code temporaryFolder} |
| * <li>write {@code fileCount} number of files to the new Iceberg table |
| * <li>Discover the splits from the table and partition the splits by the {@code filePerSplit} |
| * limit |
| * <li>Delete the Hadoop table |
| * |
| * <p>Since the table and data files are deleted before this method return, caller shouldn't |
| * attempt to read the data files. |
| * |
| * <p>By default, v1 Iceberg table is created. For v2 table use {@link |
| * SplitHelpers#createSplitsFromTransientHadoopTable(TemporaryFolder, int, int, String)} |
| * |
| * @param temporaryFolder Folder to place the data to |
| * @param fileCount The number of files to create and add to the table |
| * @param filesPerSplit The number of files used for a split |
| */ |
| public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable( |
| TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception { |
| return createSplitsFromTransientHadoopTable(temporaryFolder, fileCount, filesPerSplit, "1"); |
| } |
| |
| /** |
| * This create a list of IcebergSourceSplit from real files |
| * <li>Create a new Hadoop table under the {@code temporaryFolder} |
| * <li>write {@code fileCount} number of files to the new Iceberg table |
| * <li>Discover the splits from the table and partition the splits by the {@code filePerSplit} |
| * limit |
| * <li>Delete the Hadoop table |
| * |
| * <p>Since the table and data files are deleted before this method return, caller shouldn't |
| * attempt to read the data files. |
| * |
| * @param temporaryFolder Folder to place the data to |
| * @param fileCount The number of files to create and add to the table |
| * @param filesPerSplit The number of files used for a split |
| * @param version The table version to create |
| */ |
| public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable( |
| TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit, String version) |
| throws Exception { |
| final File warehouseFile = temporaryFolder.newFolder(); |
| Assert.assertTrue(warehouseFile.delete()); |
| final String warehouse = "file:" + warehouseFile; |
| Configuration hadoopConf = new Configuration(); |
| final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse); |
| ImmutableMap<String, String> properties = |
| ImmutableMap.of(TableProperties.FORMAT_VERSION, version); |
| try { |
| final Table table = |
| catalog.createTable( |
| TestFixtures.TABLE_IDENTIFIER, |
| TestFixtures.SCHEMA, |
| PartitionSpec.unpartitioned(), |
| null, |
| properties); |
| final GenericAppenderHelper dataAppender = |
| new GenericAppenderHelper(table, FileFormat.PARQUET, temporaryFolder); |
| for (int i = 0; i < fileCount; ++i) { |
| List<Record> records = RandomGenericData.generate(TestFixtures.SCHEMA, 2, i); |
| dataAppender.appendToTable(records); |
| } |
| |
| final ScanContext scanContext = ScanContext.builder().build(); |
| final List<IcebergSourceSplit> splits = |
| FlinkSplitPlanner.planIcebergSourceSplits( |
| table, scanContext, ThreadPools.getWorkerPool()); |
| return splits.stream() |
| .flatMap( |
| split -> { |
| List<List<FileScanTask>> filesList = |
| Lists.partition(Lists.newArrayList(split.task().files()), filesPerSplit); |
| return filesList.stream() |
| .map(files -> new BaseCombinedScanTask(files)) |
| .map( |
| combinedScanTask -> |
| IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); |
| }) |
| .collect(Collectors.toList()); |
| } finally { |
| catalog.dropTable(TestFixtures.TABLE_IDENTIFIER); |
| catalog.close(); |
| } |
| } |
| } |