blob: 47cbfcd356790dd09ba75382ec1e2776468a7db8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.hdfs.partitioner;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
public class TestDirectoryPartitioner {
class TestFileSystemAdapter implements FileSystemAdapter {
private List<FileMetadata> expectedList;
public TestFileSystemAdapter(List<FileMetadata> expectedList) {
this.expectedList = expectedList;
}
public List<FileMetadata> getAllFiles(String streamName) {
return expectedList;
}
}
private void verifyPartitionDescriptor(String[] inputFiles, int[][] expectedPartitioning, int expectedNumPartition,
Map<Partition, List<String>> actualPartitioning) {
Assert.assertEquals(expectedNumPartition, actualPartitioning.size());
Set<String> actualPartitioningPath = new HashSet<>();
actualPartitioning.values().forEach(list -> actualPartitioningPath.add(String.join(",", list)));
for (int i = 0; i < expectedNumPartition; i++) {
int[] indexes = expectedPartitioning[i];
List<String> files = new ArrayList<>();
for (int j : indexes) {
files.add(inputFiles[j]);
}
files.sort(Comparator.<String>naturalOrder());
String expectedCombinedPath = String.join(",", files);
Assert.assertTrue(actualPartitioningPath.contains(expectedCombinedPath));
}
}
@Test
public void testBasicWhiteListFiltering() {
List<FileMetadata> testList = new ArrayList<>();
int numInput = 9;
String[] inputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"delta-01.avro",
"part-005.avro",
"delta-03.avro",
"part-004.avro",
"delta-02.avro",
"part-006.avro"};
long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
}
String whiteList = "part-.*\\.avro";
String blackList = "";
String groupPattern = "";
int expectedNumPartition = 6;
int[][] expectedPartitioning = {{0}, {1}, {2}, {4}, {6}, {8}};
DirectoryPartitioner directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
Assert.assertEquals(expectedNumPartition, metadataMap.size());
Map<Partition, List<String>> descriptorMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, descriptorMap);
}
@Test
public void testBasicBlackListFiltering() {
List<FileMetadata> testList = new ArrayList<>();
int numInput = 9;
String[] inputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"delta-01.avro",
"part-005.avro",
"delta-03.avro",
"part-004.avro",
"delta-02.avro",
"part-006.avro"};
long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
}
String whiteList = ".*";
String blackList = "delta-.*\\.avro";
String groupPattern = "";
int expectedNumPartition = 6;
int[][] expectedPartitioning = {{0}, {1}, {2}, {4}, {6}, {8}};
DirectoryPartitioner directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
Assert.assertEquals(expectedNumPartition, metadataMap.size());
Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, descriporMap);
}
@Test
public void testWhiteListBlackListFiltering() {
List<FileMetadata> testList = new ArrayList<>();
int numInput = 9;
String[] inputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"delta-01.avro",
"part-005.avro",
"delta-03.avro",
"part-004.avro",
"delta-02.avro",
"part-006.avro"};
long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
}
String whiteList = "part-.*\\.avro";
String blackList = "part-002.avro";
String groupPattern = "";
int expectedNumPartition = 5;
int[][] expectedPartitioning = {{0}, {2}, {4}, {6}, {8}};
DirectoryPartitioner directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
Assert.assertEquals(expectedNumPartition, metadataMap.size());
Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, descriporMap);
}
@Test
public void testBasicGrouping() {
List<FileMetadata> testList = new ArrayList<>();
int numInput = 9;
String[] inputFiles = {
"00_10-run_2016-08-15-13-04-part.0.150582.avro",
"00_10-run_2016-08-15-13-04-part.1.138132.avro",
"00_10-run_2016-08-15-13-04-part.2.214005.avro",
"00_10-run_2016-08-15-13-05-part.0.205738.avro",
"00_10-run_2016-08-15-13-05-part.1.158273.avro",
"00_10-run_2016-08-15-13-05-part.2.982345.avro",
"00_10-run_2016-08-15-13-06-part.0.313245.avro",
"00_10-run_2016-08-15-13-06-part.1.234212.avro",
"00_10-run_2016-08-15-13-06-part.2.413232.avro"};
long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345, 313245, 234212, 413232};
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
}
String whiteList = ".*\\.avro";
String blackList = "";
String groupPattern = ".*part\\.[id]\\..*\\.avro"; // 00_10-run_2016-08-15-13-04-part.[id].138132.avro
int expectedNumPartition = 3;
int[][] expectedPartitioning = {
{0, 3, 6}, // files from index 0, 3, 6 should be grouped into one partition
{1, 4, 7}, // similar as above
{2, 5, 8}};
DirectoryPartitioner directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
Assert.assertEquals(expectedNumPartition, metadataMap.size());
Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, descriporMap);
}
@Test
public void testValidDirectoryUpdating() {
// the update is valid when there are only new files being added to the directory
// no changes on the old files
List<FileMetadata> testList = new ArrayList<>();
int numInput = 6;
String[] inputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"part-005.avro",
"part-004.avro",
"part-006.avro"};
long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345};
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
}
String whiteList = ".*";
String blackList = "";
String groupPattern = "";
int expectedNumPartition = 6;
int[][] expectedPartitioning = {{0}, {1}, {2}, {3}, {4}, {5}};
DirectoryPartitioner directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
Assert.assertEquals(expectedNumPartition, metadataMap.size());
Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, descriporMap);
numInput = 7;
String[] updatedInputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"part-005.avro",
"part-004.avro",
"part-007.avro", // add a new file to the directory
"part-006.avro"};
long[] updatedFileLength = {150582, 138132, 214005, 205738, 158273, 2513454, 982345};
testList.clear();
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(updatedInputFiles[i], updatedFileLength[i]));
}
directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", descriporMap);
Assert.assertEquals(expectedNumPartition, metadataMap.size()); // still expect only 6 partitions instead of 7
Map<Partition, List<String>> updatedDescriptorMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, updatedDescriptorMap);
}
@Test
public void testInvalidDirectoryUpdating() {
// the update is invalid when at least one old file is removed
List<FileMetadata> testList = new ArrayList<>();
int numInput = 6;
String[] inputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"part-005.avro",
"part-004.avro",
"part-006.avro"};
long[] fileLength = {150582, 138132, 214005, 205738, 158273, 982345};
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(inputFiles[i], fileLength[i]));
}
String whiteList = ".*";
String blackList = "";
String groupPattern = "";
int expectedNumPartition = 6;
int[][] expectedPartitioning = {{0}, {1}, {2}, {3}, {4}, {5}};
DirectoryPartitioner directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
Map<Partition, SystemStreamPartitionMetadata> metadataMap = directoryPartitioner.getPartitionMetadataMap("hdfs", null);
Assert.assertEquals(expectedNumPartition, metadataMap.size());
Map<Partition, List<String>> descriporMap = directoryPartitioner.getPartitionDescriptor("hdfs");
verifyPartitionDescriptor(inputFiles, expectedPartitioning, expectedNumPartition, descriporMap);
String[] updatedInputFiles = {
"part-001.avro",
"part-002.avro",
"part-003.avro",
"part-005.avro",
"part-007.avro", // remove part-004 and replace it with 007
"part-006.avro"};
long[] updatedFileLength = {150582, 138132, 214005, 205738, 158273, 982345};
testList.clear();
for (int i = 0; i < numInput; i++) {
testList.add(new FileMetadata(updatedInputFiles[i], updatedFileLength[i]));
}
directoryPartitioner =
new DirectoryPartitioner(whiteList, blackList, groupPattern, new TestFileSystemAdapter(testList));
try {
directoryPartitioner.getPartitionMetadataMap("hdfs", descriporMap);
Assert.fail("Expect exception thrown from getting metadata. Should not reach this point.");
} catch (SamzaException e) {
// expect exception to be thrown
}
}
}