blob: 82445043eca94e8bc50ea34972e8921d8092fd7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.hdfs.partitioner;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;
/**
* The partitioner that takes a directory as an input and does
* 1. Filtering, based on a white list and a black list
* 2. Grouping, based on grouping pattern
*
* And then generate the partition metadata and partition descriptors
*
* This class holds the assumption that the directory remains immutable.
* If the directory does changes:
* ignore new files showing up in the directory based on an old version of partition descriptor;
* throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
*/
public class DirectoryPartitioner {
private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class);
private static final String GROUP_IDENTIFIER = "\\[id]";
private final String whiteListRegex;
private final String blackListRegex;
private final String groupPattern;
private final FileSystemAdapter fileSystemAdapter;
// stream name => partition => partition descriptor
private final Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>();
public DirectoryPartitioner(String whiteList, String blackList, String groupPattern,
FileSystemAdapter fileSystemAdapter) {
this.whiteListRegex = whiteList;
this.blackListRegex = blackList;
this.groupPattern = groupPattern;
this.fileSystemAdapter = fileSystemAdapter;
LOG.info(String
.format("Creating DirectoryPartitioner with whiteList=%s, blackList=%s, groupPattern=%s", whiteList, blackList,
groupPattern));
}
/*
* Based on the stream name, get the list of all files and filter out unneeded ones given
* the white list and black list
*/
private List<FileMetadata> getFilteredFiles(String streamName) {
List<FileMetadata> filteredFiles = new ArrayList<>();
List<FileMetadata> allFiles = fileSystemAdapter.getAllFiles(streamName);
LOG.info(String.format("List of all files for %s: %s", streamName, allFiles));
allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex))
.forEach(filteredFiles::add);
// sort the files to have a consistent order
filteredFiles.sort(Comparator.comparing(FileMetadata::getPath));
LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles));
return filteredFiles;
}
/*
* Algorithm to extract the group identifier from the path based on
* the group pattern.
* 1. Split the group pattern into two parts (prefix, suffix)
* 2. Match the both prefix and suffix against the input
* 3. Strip the prefix and suffix and then we can get the group identifier
*
* For example,
* input = run_2016-08-01-part-3.avro
* group pattern = ".*part-[id]/.avro"
*
* 1. Split: prefix pattern = ".*part-" suffix pattern = "/.avro"
* 2. Match: prefix string = "run_2016-08-01-part-" suffix string = ".avro"
* 3. Extract: output = "3"
*
* If we can't extract a group identifier, return the original input
*/
private String extractGroupIdentifier(String input) {
if (StringUtils.isBlank(GROUP_IDENTIFIER)) {
return input;
}
String[] patterns = groupPattern.split(GROUP_IDENTIFIER);
if (patterns.length != 2) {
return input;
}
Pattern p1 = Pattern.compile(patterns[0]);
Pattern p2 = Pattern.compile(patterns[1]);
Matcher m1 = p1.matcher(input);
Matcher m2 = p2.matcher(input);
if (!m1.find()) {
return input;
}
int s1 = m1.end();
if (!m2.find(s1)) {
return input;
}
int s2 = m2.start();
return input.substring(s1, s2);
}
/*
* Group partitions based on the group identifier extracted from the file path
*/
private List<List<FileMetadata>> generatePartitionGroups(List<FileMetadata> filteredFiles) {
Map<String, List<FileMetadata>> map = new HashMap<>();
for (FileMetadata fileMetadata : filteredFiles) {
String groupId = extractGroupIdentifier(fileMetadata.getPath());
map.putIfAbsent(groupId, new ArrayList<>());
map.get(groupId).add(fileMetadata);
}
List<List<FileMetadata>> ret = new ArrayList<>();
// sort the map to guarantee consistent ordering
List<String> sortedKeys = new ArrayList<>(map.keySet());
sortedKeys.sort(Comparator.naturalOrder());
sortedKeys.stream().forEach(key -> ret.add(map.get(key)));
return ret;
}
/*
* This class holds the assumption that the directory remains immutable.
* If the directory does changes:
* ignore new files showing up in the directory based on an old version of partition descriptor;
* throw {@link org.apache.samza.SamzaException} if at least one old file doesn't exist anymore
*/
private List<FileMetadata> validateAndGetOriginalFilteredFiles(List<FileMetadata> newFileList,
Map<Partition, List<String>> existingPartitionDescriptor) {
assert newFileList != null;
assert existingPartitionDescriptor != null;
Set<String> oldFileSet = new HashSet<>();
existingPartitionDescriptor.values().forEach(oldFileSet::addAll);
Set<String> newFileSet = new HashSet<>();
newFileList.forEach(file -> newFileSet.add(file.getPath()));
if (!newFileSet.containsAll(oldFileSet)) {
throw new SamzaException("The list of new files is not a super set of the old files. diff = "
+ oldFileSet.removeAll(newFileSet));
}
newFileList.removeIf(file -> !oldFileSet.contains(file.getPath()));
return newFileList;
}
/**
* Get partition metadata for a stream
* @param streamName name of the stream; should contain the information about the path of the
* root directory
* @param existingPartitionDescriptorMap map of the existing partition descriptor
* @return map of SSP metadata
*/
public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
@Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
LOG.info("Trying to obtain metadata for " + streamName);
LOG.info("Existing partition descriptor: " + (MapUtils.isEmpty(existingPartitionDescriptorMap) ? "empty"
: existingPartitionDescriptorMap));
Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
if (!MapUtils.isEmpty(existingPartitionDescriptorMap)) {
filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
}
List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
int partitionId = 0;
for (List<FileMetadata> fileGroup : groupedPartitions) {
Partition partition = new Partition(partitionId);
List<String> pathList = new ArrayList<>();
List<String> lengthList = new ArrayList<>();
fileGroup.forEach(fileMetadata -> {
pathList.add(fileMetadata.getPath());
lengthList.add(String.valueOf(fileMetadata.getLen()));
});
String oldestOffset = MultiFileHdfsReader.generateOffset(0, "0");
String newestOffset = MultiFileHdfsReader.generateOffset(lengthList.size() - 1, String.valueOf(lengthList.get(lengthList.size() - 1)));
SystemStreamPartitionMetadata metadata =
new SystemStreamPartitionMetadata(oldestOffset, newestOffset, null);
partitionMetadataMap.put(partition, metadata);
partitionDescriptorMap.get(streamName).put(partition, pathList);
partitionId++;
}
LOG.info("Obtained metadata map as: " + partitionMetadataMap);
LOG.info("Computed partition description as: " + partitionDescriptorMap);
return partitionMetadataMap;
}
/**
* Get partition descriptors for a stream
* @param streamName name of the stream; should contain the information about the path of the
* root directory
* @return map of the partition descriptor
*/
public Map<Partition, List<String>> getPartitionDescriptor(String streamName) {
return partitionDescriptorMap.get(streamName);
}
}