blob: aa2487808d9efe561eb5710c73ac5d8da7e8edad [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.compat.MetastoreShim;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.Test;
public class AcidUtilsTest {
/** Fake base path to root all FileStatuses under. */
private static final Path BASE_PATH = new Path("file:///foo/bar/");
public AcidUtilsTest() {
Assume.assumeTrue("Tests require Hive 3 to parse and use WriteIdList",
MetastoreShim.getMajorVersion() == 3);
}
private static List<FileStatus> createMockStats(String... testStrings) {
return Arrays.asList(testStrings).stream().map(s -> {
return new FileStatus(
/* length=*/0, /*isdir=*/s.endsWith("/"), /*block_replication=*/1,
/*blocksize=*/0, /*mtime=*/1, /*path=*/new Path(BASE_PATH, s));
}).collect(Collectors.toList());
}
private void assertFiltering(String[] relPaths, String validWriteIdListStr,
String[] expectedRelPaths) {
assertFiltering(relPaths, "", validWriteIdListStr, expectedRelPaths);
}
private void assertFiltering(String[] relPaths, String validTxnListStr,
String validWriteIdListStr, String[] expectedRelPaths) {
ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
validWriteIdListStr);
List<FileStatus> stats = createMockStats(relPaths);
List<FileStatus> expectedStats = createMockStats(expectedRelPaths);
assertThat(AcidUtils.filterFilesForAcidState(stats, BASE_PATH,
new ValidReadTxnList(validTxnListStr), writeIds, null),
Matchers.containsInAnyOrder(expectedStats.toArray()));
}
@Test
public void testParsingBaseWriteIds() {
assertEquals(AcidUtils.SENTINEL_BASE_WRITE_ID,
AcidUtils.getBaseWriteId("base_01.txt"));
assertEquals(AcidUtils.SENTINEL_BASE_WRITE_ID,
AcidUtils.getBaseWriteId("base_01_02"));
assertEquals(AcidUtils.SENTINEL_BASE_WRITE_ID,
AcidUtils.getBaseWriteId("abc/base_6"));
assertEquals(2, AcidUtils.getBaseWriteId("base_00002"));
assertEquals(2, AcidUtils.getBaseWriteId("base_00002/foo"));
assertEquals(2, AcidUtils.getBaseWriteId("base_00002_v12345"));
assertEquals(2, AcidUtils.getBaseWriteId("base_00002_v12345/foo"));
}
@Test
public void testBasePredicate() {
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"base_0000005/",
"base_0000005/abc.txt",
"base_0000005/0000/",
"base_0000005/0000/abc.txt",
"_tmp.base_0000005/000000_0.manifest",
"abc/",
"abc/base_0000006/", // Not at root, so shouldn't be handled.
"base_00000100/def.txt"},
// <tableName>:<highWatermark>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
"default.test:10:1234:1,2,3",
new String[]{
"base_0000005/abc.txt",
"base_0000005/0000/abc.txt"});
}
@Test
public void testOpenTransactions() {
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"base_0000005/",
"base_0000005/abc.txt",
"base_0000005/0000/",
"base_0000005/0000/abc.txt",
"_tmp.base_0000005/000000_0.manifest",
"delta_0000006_0000006_0000/",
"delta_0000006_0000006_0000/000000_0",
"delta_0000007_0000007_0000/",
"delta_0000007_0000007_0000/000000_0",
"delta_0000008_0000008_0000/",
"delta_0000008_0000008_0000/000000_0",
"delta_0000009_0000009_0000/",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"},
"default.test:10:6:6,7,8:", // 6,7,8 are open write ids
new String[]{
"base_0000005/abc.txt",
"base_0000005/0000/abc.txt",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"});
}
@Test
public void testInProgressCompaction() {
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"base_0000005_v900/",
"base_0000005_v900/abc.txt",
"base_0000005_v900/0000/",
"base_0000005_v900/0000/abc.txt",
"base_0000008_v1000/",
"base_0000008_v1000/abc.txt",
"delta_0000006_0000006_0000/",
"delta_0000006_0000006_0000/000000_0",
"delta_0000007_0000007_0000/",
"delta_0000007_0000007_0000/000000_0",
"delta_0000008_0000008_0000/",
"delta_0000008_0000008_0000/000000_0",
"delta_0000009_0000009_0000/",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"},
"1100:1000:1000:", // txn 1000 is open
"default.test:10:10::", // write ids are committed
new String[]{
"base_0000005_v900/abc.txt",
"base_0000005_v900/0000/abc.txt",
"delta_0000006_0000006_0000/000000_0",
"delta_0000007_0000007_0000/000000_0",
"delta_0000008_0000008_0000/000000_0",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"});
}
@Test
public void testAbortedCompaction() {
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"base_0000005_v900/",
"base_0000005_v900/abc.txt",
"base_0000005_v900/0000/",
"base_0000005_v900/0000/abc.txt",
"base_0000007_v950/",
"base_0000007_v950/abc.txt",
"base_0000008_v1000/",
"base_0000008_v1000/abc.txt",
"delta_0000006_0000006_0000/",
"delta_0000006_0000006_0000/000000_0",
"delta_0000007_0000007_0000/",
"delta_0000007_0000007_0000/000000_0",
"delta_0000008_0000008_0000/",
"delta_0000008_0000008_0000/000000_0",
"delta_0000009_0000009_0000/",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"},
"1100:1100::1000", // txn 1000 is aborted
"default.test:10:10::", // write ids are committed
new String[]{
"base_0000007_v950/abc.txt",
"delta_0000008_0000008_0000/000000_0",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"});
}
@Test
public void testAbortedTransactions() {
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"base_0000005/",
"base_0000005/abc.txt",
"base_0000005/0000/",
"base_0000005/0000/abc.txt",
"_tmp.base_0000005/000000_0.manifest",
"delta_0000006_0000006_0000/",
"delta_0000006_0000006_0000/000000_0",
"delta_0000007_0000007_0000/",
"delta_0000007_0000007_0000/000000_0",
"delta_0000008_0000008_0000/",
"delta_0000008_0000008_0000/000000_0",
"delta_0000009_0000009_0000/",
"delta_0000009_0000009_0000/000000_0",
"delta_0000009_0000009_0000/0000/def.txt"},
"default.test:10:1337::7,8,9", // 7,8,9 are aborted write ids
new String[]{
"base_0000005/abc.txt",
"base_0000005/0000/abc.txt",
"delta_0000006_0000006_0000/000000_0"});
}
@Test
public void testPostCompactionBase() {
assertFiltering(new String[]{
"base_0000003_v0003217/",
"base_0000003_v0003217/000000_0",
"delta_0000001_0000001_0000/",
"delta_0000001_0000001_0000/000000_0",
"delta_0000002_0000002_0000/",
"delta_0000002_0000002_0000/000000_0",
"delta_0000003_0000003_0000/",
"delta_0000003_0000003_0000/000000_0"},
"test_acid_compaction_676f9a30.tt:3:9223372036854775807::",
new String[]{
"base_0000003_v0003217/000000_0"});
}
@Test
public void testDeltaPredicate() {
String[] paths = new String[]{
"delta_000005_0000005/",
"delta_000005_0000005/abc.txt",
"delta_000005_0000005_0000/",
"delta_000005_0000005_0000/abc.txt",
"delta_000006_0000020/",
"delta_000006_0000020/def.txt",
"delta_000005.txt"};
// Only committed up to transaction 10, so skip the 6-20 delta.
assertFiltering(paths,
"default.test:10:1234:1,2,3",
new String[]{
"delta_000005_0000005/abc.txt",
"delta_000005_0000005_0000/abc.txt",
"delta_000005.txt"});
}
@Test
public void testAcidStateFilter() {
assertFiltering(new String[]{
"base_0000009/",
"base_0000009/abc.txt",
"delta_000005_000005_0000/",
"delta_000005_000005_0000/lmn.txt",
"base_0000010/",
"base_0000010/00000_0",
"base_0000010/00001_0",
"delta_0000012_0000012_0000/",
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"},
"", // Accept all,
new String[]{
"base_0000010/00000_0",
"base_0000010/00001_0",
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"});
}
@Test
public void testAcidStateNoBase() {
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"delta_000005_000005_0000/",
"delta_000005_000005_0000/lmn.txt",
"base_000010/",
"delta_0000012_0000012_0000/",
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"},
"", // writeIdList that accepts all transactions as valid
new String[]{
// Post upgrade files are ignored if there is a valid base.
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"});
// Same set of files, but no base directory.
assertFiltering(new String[]{
"base_01.txt",
"post_upgrade.txt",
"delta_000005_000005_0000/",
"delta_000005_000005_0000/lmn.txt",
"delta_0000012_0000012_0000/",
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"},
"", // writeIdList that accepts all transactions as valid
new String[]{
"base_01.txt", // Post upgrade files are considered valid if there is no base.
"post_upgrade.txt",
"delta_000005_000005_0000/lmn.txt",
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"});
}
}