blob: 846f20a07b7caf22f05a7782004026a06af977b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MicroBatches {
private MicroBatches() {
}
public static class MicroBatch {
private final long snapshotId;
private final int startFileIndex;
private final int endFileIndex;
private final long sizeInBytes;
private final List<FileScanTask> tasks;
private final boolean lastIndexOfSnapshot;
private MicroBatch(long snapshotId, int startFileIndex, int endFileIndex, long sizeInBytes,
List<FileScanTask> tasks, boolean lastIndexOfSnapshot) {
this.snapshotId = snapshotId;
this.startFileIndex = startFileIndex;
this.endFileIndex = endFileIndex;
this.sizeInBytes = sizeInBytes;
this.tasks = tasks;
this.lastIndexOfSnapshot = lastIndexOfSnapshot;
}
public long snapshotId() {
return snapshotId;
}
public int startFileIndex() {
return startFileIndex;
}
public int endFileIndex() {
return endFileIndex;
}
public long sizeInBytes() {
return sizeInBytes;
}
public List<FileScanTask> tasks() {
return tasks;
}
public boolean lastIndexOfSnapshot() {
return lastIndexOfSnapshot;
}
}
public static MicroBatchBuilder from(Snapshot snapshot, FileIO io) {
return new MicroBatchBuilder(snapshot, io);
}
public static class MicroBatchBuilder {
private static final Logger LOG = LoggerFactory.getLogger(MicroBatchBuilder.class);
private final Snapshot snapshot;
private final FileIO io;
private boolean caseSensitive;
private Map<Integer, PartitionSpec> specsById;
private MicroBatchBuilder(Snapshot snapshot, FileIO io) {
this.snapshot = snapshot;
this.io = io;
this.caseSensitive = true;
}
public MicroBatchBuilder caseSensitive(boolean sensitive) {
this.caseSensitive = sensitive;
return this;
}
public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
this.specsById = specs;
return this;
}
public MicroBatch generate(int startFileIndex, long targetSizeInBytes, boolean isStarting) {
Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");
List<ManifestFile> manifests = isStarting ? snapshot.dataManifests() :
snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
.collect(Collectors.toList());
List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
List<Pair<ManifestFile, Integer>> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex);
return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, isStarting);
}
/**
* Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, manifest
* m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, 3), (m3, 5).
*
* @param manifestFiles List of input manifests used to index.
* @return a list of manifest index with key as manifest file, value as file counts.
*/
private static List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFile> manifestFiles) {
int currentFileIndex = 0;
List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();
for (ManifestFile manifest : manifestFiles) {
manifestIndexes.add(Pair.of(manifest, currentFileIndex));
currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
}
return manifestIndexes;
}
/**
* Method to skip the manifest file in which the index is smaller than startFileIndex. For example, if the
* index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned manifest index list is:
* (m2, 3), (m3, 5).
*
* @param indexedManifests List of input manifests.
* @param startFileIndex Index used to skip the processed manifests.
* @return a sub-list of manifest file index which only contains the manifest indexes larger than the
* startFileIndex.
*/
private static List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests,
int startFileIndex) {
if (startFileIndex == 0) {
return indexedManifests;
}
int manifestIndex = 0;
for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
if (manifest.second() > startFileIndex) {
break;
}
manifestIndex++;
}
return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
}
/**
* Method to generate MicroBatch of this snapshot based on the indexed manifests, controlled by targetSizeInBytes.
*
* @param indexedManifests A list of indexed manifests to generate MicroBatch
* @param startFileIndex A startFileIndex used to skip processed files.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than
* this size.
* @param isStarting Used to check where all the data file should be processed, or only added files.
* @return A MicroBatch.
*/
private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedManifests,
int startFileIndex, long targetSizeInBytes, boolean isStarting) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(snapshot.snapshotId(), startFileIndex, startFileIndex + 1, 0L,
Collections.emptyList(), true);
}
long currentSizeInBytes = 0L;
int currentFileIndex = 0;
boolean isLastIndex = false;
List<FileScanTask> tasks = Lists.newArrayList();
for (int idx = 0; idx < indexedManifests.size(); idx++) {
currentFileIndex = indexedManifests.get(idx).second();
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), isStarting);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (currentFileIndex >= startFileIndex) {
// Make sure there's at least one task in each MicroBatch to void job to be stuck, always add task
// firstly.
tasks.add(task);
currentSizeInBytes += task.length();
}
currentFileIndex++;
if (currentSizeInBytes >= targetSizeInBytes) {
break;
}
}
if (idx + 1 == indexedManifests.size() && !taskIter.hasNext()) {
// If this is the last file scan task in last manifest, set the flag to true.
isLastIndex = true;
}
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
}
if (currentSizeInBytes >= targetSizeInBytes) {
if (tasks.size() > 1 && currentSizeInBytes > targetSizeInBytes) {
// If there's more than 1 task in this batch, and the size exceeds the limit, we should revert last
// task to make sure we don't exceed the size limit.
FileScanTask extraTask = tasks.remove(tasks.size() - 1);
currentSizeInBytes -= extraTask.length();
currentFileIndex--;
isLastIndex = false;
}
break;
}
}
return new MicroBatch(snapshot.snapshotId(), startFileIndex, currentFileIndex, currentSizeInBytes,
tasks, isLastIndex);
}
private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean isStarting) {
ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile))
.specsById(specsById)
.caseSensitive(caseSensitive);
if (isStarting) {
manifestGroup = manifestGroup
.filterManifestEntries(entry ->
entry.snapshotId() == snapshot.snapshotId() && entry.status() == ManifestEntry.Status.ADDED)
.ignoreDeleted();
}
return manifestGroup.planFiles();
}
}
}