blob: 4469dece2f25c28e4a870d8951504f462d8f0c8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.testing.GcFinalization;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.ContentCache;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class TestManifestCaching {
// Schema passed to create tables
static final Schema SCHEMA =
new Schema(
required(3, "id", Types.IntegerType.get(), "unique ID"),
required(4, "data", Types.StringType.get()));
// Partition spec used to create tables
static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
@TempDir private Path temp;
@Test
public void testPlanWithCache() throws Exception {
Map<String, String> properties =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL,
HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
"true");
Table table = createTable(properties);
ContentCache cache = ManifestFiles.contentCache(table.io());
assertThat(cache.estimatedCacheSize()).isEqualTo(0);
int numFiles = 4;
List<DataFile> files16Mb = newFiles(numFiles, 16 * 1024 * 1024);
appendFiles(files16Mb, table);
// planTask with SPLIT_SIZE half of the file size
TableScan scan1 =
table.newScan().option(TableProperties.SPLIT_SIZE, String.valueOf(8 * 1024 * 1024));
assertThat(scan1.planTasks()).hasSize(numFiles * 2);
assertThat(cache.estimatedCacheSize())
.as("All manifest files should be cached")
.isEqualTo(numFiles);
assertThat(cache.stats().loadCount())
.as("All manifest files should be recently loaded")
.isEqualTo(numFiles);
long missCount = cache.stats().missCount();
// planFiles and verify that cache size still the same
TableScan scan2 = table.newScan();
assertThat(scan2.planFiles()).hasSize(numFiles);
assertThat(cache.estimatedCacheSize()).isEqualTo(numFiles);
assertThat(cache.stats().missCount())
.as("All manifest file reads should hit cache")
.isEqualTo(missCount);
ManifestFiles.dropCache(table.io());
}
@Test
public void testPlanWithSmallCache() throws Exception {
Map<String, String> properties =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true",
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, "1",
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, "1");
Table table = createTable(properties);
int numFiles = 4;
List<DataFile> files16Mb = newFiles(numFiles, 16 * 1024 * 1024);
appendFiles(files16Mb, table);
// We should never hit cache.
TableScan scan = table.newScan();
ContentCache cache = ManifestFiles.contentCache(scan.table().io());
assertThat(cache.maxContentLength()).isEqualTo(1);
assertThat(cache.maxTotalBytes()).isEqualTo(1);
assertThat(scan.planFiles()).hasSize(numFiles);
assertThat(cache.estimatedCacheSize()).isEqualTo(0);
assertThat(cache.stats().loadCount())
.as("File should not be loaded through cache")
.isEqualTo(0);
assertThat(cache.stats().requestCount()).as("Cache should not serve file").isEqualTo(0);
ManifestFiles.dropCache(scan.table().io());
}
@Test
public void testUniqueCache() throws Exception {
Map<String, String> properties1 =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL,
HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
"true");
Table table1 = createTable(properties1);
Map<String, String> properties2 =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true",
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, "1",
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, "1");
Table table2 = createTable(properties2);
ContentCache cache1 = ManifestFiles.contentCache(table1.io());
ContentCache cache2 = ManifestFiles.contentCache(table2.io());
ContentCache cache3 = ManifestFiles.contentCache(table2.io());
assertThat(cache2).isNotSameAs(cache1);
assertThat(cache3).isSameAs(cache2);
ManifestFiles.dropCache(table1.io());
ManifestFiles.dropCache(table2.io());
}
@Test
public void testRecreateCache() throws Exception {
Map<String, String> properties =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL,
HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
"true");
Table table = createTable(properties);
ContentCache cache1 = ManifestFiles.contentCache(table.io());
ManifestFiles.dropCache(table.io());
ContentCache cache2 = ManifestFiles.contentCache(table.io());
assertThat(cache2).isNotSameAs(cache1);
ManifestFiles.dropCache(table.io());
}
@Test
public void testWeakFileIOReferenceCleanUp() {
Cache<FileIO, ContentCache> manifestCache =
ManifestFiles.newManifestCacheBuilder().executor(Runnable::run).build();
int maxIO = SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.defaultValue();
FileIO firstIO = null;
ContentCache firstCache = null;
for (int i = 0; i < maxIO - 1; i++) {
FileIO io = cacheEnabledHadoopFileIO();
ContentCache cache = contentCache(manifestCache, io);
if (i == 0) {
firstIO = io;
firstCache = cache;
}
}
// Insert the last FileIO and trigger GC + cleanup.
FileIO lastIO = cacheEnabledHadoopFileIO();
ContentCache lastCache = contentCache(manifestCache, lastIO);
GcFinalization.awaitDone(
() -> {
manifestCache.cleanUp();
return manifestCache.estimatedSize() == 2;
});
// Verify that manifestCache evicts all FileIO except the firstIO and lastIO.
ContentCache cache1 = contentCache(manifestCache, firstIO);
ContentCache cacheN = contentCache(manifestCache, lastIO);
assertThat(cache1).isSameAs(firstCache);
assertThat(cacheN).isSameAs(lastCache);
assertThat(manifestCache.stats().loadCount()).isEqualTo(maxIO);
assertThat(manifestCache.stats().evictionCount()).isEqualTo(maxIO - 2);
}
/**
* Helper to get existing or insert new {@link ContentCache} into the given manifestCache.
*
* @return an existing or new {@link ContentCache} associated with given io.
*/
private static ContentCache contentCache(Cache<FileIO, ContentCache> manifestCache, FileIO io) {
return manifestCache.get(
io,
fileIO ->
new ContentCache(
ManifestFiles.cacheDurationMs(fileIO),
ManifestFiles.cacheTotalBytes(fileIO),
ManifestFiles.cacheMaxContentLength(fileIO)));
}
private FileIO cacheEnabledHadoopFileIO() {
Map<String, String> properties =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL,
HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
"true");
HadoopFileIO io = new HadoopFileIO(new Configuration());
io.initialize(properties);
return io;
}
private Table createTable(Map<String, String> properties) throws Exception {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
return hadoopCatalog(properties)
.buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.create();
}
private HadoopCatalog hadoopCatalog(Map<String, String> catalogProperties) throws IOException {
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize(
"hadoop",
ImmutableMap.<String, String>builder()
.putAll(catalogProperties)
.put(
CatalogProperties.WAREHOUSE_LOCATION,
Files.createTempDirectory(temp, "junit").toFile().getAbsolutePath())
.buildOrThrow());
return hadoopCatalog;
}
private void appendFiles(Iterable<DataFile> files, Table table) {
for (DataFile file : files) {
AppendFiles appendFile = table.newAppend();
appendFile.appendFile(file);
appendFile.commit();
}
}
private List<DataFile> newFiles(int numFiles, long sizeInBytes) {
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET, 1);
}
private List<DataFile> newFiles(
int numFiles, long sizeInBytes, FileFormat fileFormat, int numOffset) {
List<DataFile> files = Lists.newArrayList();
for (int fileNum = 0; fileNum < numFiles; fileNum++) {
files.add(newFile(sizeInBytes, fileFormat, numOffset));
}
return files;
}
private DataFile newFile(long sizeInBytes, FileFormat fileFormat, int numOffsets) {
String fileName = UUID.randomUUID().toString();
DataFiles.Builder builder =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(fileFormat.addExtension(fileName))
.withFileSizeInBytes(sizeInBytes)
.withRecordCount(2);
if (numOffsets > 1) {
long stepSize = sizeInBytes / numOffsets;
List<Long> offsets =
LongStream.range(0, numOffsets)
.map(i -> i * stepSize)
.boxed()
.collect(Collectors.toList());
builder.withSplitOffsets(offsets);
}
return builder.build();
}
}