| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system.hdfs; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.samza.Partition; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.system.IncomingMessageEnvelope; |
| import org.apache.samza.system.SystemAdmin; |
| import org.apache.samza.system.SystemStreamMetadata; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.system.hdfs.partitioner.DirectoryPartitioner; |
| import org.apache.samza.system.hdfs.partitioner.HdfsFileSystemAdapter; |
| import org.apache.samza.system.hdfs.reader.HdfsReaderFactory; |
| import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * The HDFS system admin for {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} and |
| * {@link org.apache.samza.system.hdfs.HdfsSystemProducer} |
| * |
| * A high level overview of the HDFS producer/consumer architecture: |
| * ┌──────────────────────────────────────────────────────────────────────────────┐ |
| * │ │ |
| * ┌─────────────────┤ HDFS │ |
| * │ Obtain │ │ |
| * │ Partition └──────┬──────────────────────▲──────┬─────────────────────────────────▲───────┘ |
| * │ Descriptors │ │ │ │ |
| * │ │ │ │ │ |
| * │ ┌─────────────▼───────┐ │ │ Filtering/ │ |
| * │ │ │ │ └───┐ Grouping └─────┐ |
| * │ │ HDFSAvroFileReader │ │ │ │ |
| * │ │ │ Persist │ │ │ |
| * │ └─────────┬───────────┘ Partition │ │ │ |
| * │ │ Descriptors │ ┌──────▼──────────────┐ ┌──────────┴──────────┐ |
| * │ │ │ │ │ │ │ |
| * │ ┌─────────┴───────────┐ │ │Directory Partitioner│ │ HDFSAvroWriter │ |
| * │ │ IFileReader │ │ │ │ │ │ |
| * │ │ │ │ └──────┬──────────────┘ └──────────┬──────────┘ |
| * │ └─────────┬───────────┘ │ │ │ |
| * │ │ │ │ │ |
| * │ │ │ │ │ |
| * │ ┌─────────┴───────────┐ ┌─┴──────────┴────────┐ ┌──────────┴──────────┐ |
| * │ │ │ │ │ │ │ |
| * │ │ HDFSSystemConsumer │ │ HDFSSystemAdmin │ │ HDFSSystemProducer │ |
| * └──────────▶ │ │ │ │ │ |
| * └─────────┬───────────┘ └───────────┬─────────┘ └──────────┬──────────┘ |
| * │ │ │ |
| * └────────────────────────────────────┼────────────────────────────────────┘ |
| * │ |
| * ┌───────────────────────────────────────┴──────────────────────────────────────┐ |
| * │ │ |
| * │ HDFSSystemFactory │ |
| * │ │ |
| * └──────────────────────────────────────────────────────────────────────────────┘ |
| */ |
| public class HdfsSystemAdmin implements SystemAdmin { |
| private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class); |
| |
| private final HdfsConfig hdfsConfig; |
| private final DirectoryPartitioner directoryPartitioner; |
| private final String stagingDirectory; // directory that contains the partition description |
| private final HdfsReaderFactory.ReaderType readerType; |
| |
| public HdfsSystemAdmin(String systemName, Config config) { |
| hdfsConfig = new HdfsConfig(config); |
| directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName), |
| hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName), |
| new HdfsFileSystemAdapter()); |
| stagingDirectory = hdfsConfig.getStagingDirectory(systemName); |
| readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName)); |
| } |
| |
| @Override |
| public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { |
| /* |
| * To actually get the "after" offset we have to seek to that specific offset in the file and |
| * read that record to figure out the location of next record. This is much more expensive operation |
| * compared to the case in KafkaSystemAdmin. |
| * So simply return the same offsets. This will always incur re-processing but such semantics are legit |
| * in Samza. |
| */ |
| return offsets; |
| } |
| |
| static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) { |
| if (StringUtils.isBlank(stagingDirectory)) { |
| LOG.info("Empty or null staging directory: {}", stagingDirectory); |
| return Collections.emptyMap(); |
| } |
| if (StringUtils.isBlank(streamName)) { |
| throw new SamzaException(String.format("stream name (%s) is null or empty!", streamName)); |
| } |
| Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName); |
| try (FileSystem fs = path.getFileSystem(new Configuration())) { |
| if (!fs.exists(path)) { |
| return Collections.emptyMap(); |
| } |
| try (FSDataInputStream fis = fs.open(path)) { |
| String json = IOUtils.toString(fis, StandardCharsets.UTF_8); |
| return PartitionDescriptorUtil.getDescriptorMapFromJson(json); |
| } |
| } catch (IOException e) { |
| throw new SamzaException("Failed to read partition description from: " + path); |
| } |
| } |
| |
| /* |
| * Persist the partition descriptor only when it doesn't exist already on HDFS. |
| */ |
| private void persistPartitionDescriptor(String streamName, |
| Map<Partition, List<String>> partitionDescriptorMap) { |
| if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) { |
| LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName); |
| return; |
| } |
| Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName); |
| try (FileSystem fs = targetPath.getFileSystem(new Configuration())) { |
| // Partition descriptor is supposed to be immutable. So don't override it if it exists. |
| if (fs.exists(targetPath)) { |
| LOG.warn(targetPath.toString() + " exists. Skip persisting partition descriptor."); |
| } else { |
| LOG.info("About to persist partition descriptors to path: " + targetPath.toString()); |
| try (FSDataOutputStream fos = fs.create(targetPath)) { |
| fos.write( |
| PartitionDescriptorUtil.getJsonFromDescriptorMap(partitionDescriptorMap).getBytes(StandardCharsets.UTF_8)); |
| } |
| } |
| } catch (IOException e) { |
| throw new SamzaException("Failed to validate/persist partition description on hdfs.", e); |
| } |
| } |
| |
| private boolean partitionDescriptorExists(String streamName) { |
| if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) { |
| LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName); |
| return false; |
| } |
| Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName); |
| try (FileSystem fs = targetPath.getFileSystem(new Configuration())) { |
| return fs.exists(targetPath); |
| } catch (IOException e) { |
| throw new SamzaException("Failed to obtain information about path: " + targetPath); |
| } |
| } |
| |
| /** |
| * |
| * Fetch metadata from hdfs system for a set of streams. This has the potential side effect |
| * to persist partition description to the staging directory on hdfs if staging directory |
| * is not empty. See getStagingDirectory on {@link HdfsConfig} |
| * |
| * @param streamNames |
| * The streams to to fetch metadata for. |
| * @return A map from stream name to SystemStreamMetadata for each stream |
| * requested in the parameter set. |
| */ |
| @Override |
| public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { |
| Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>(); |
| streamNames.forEach(streamName -> { |
| systemStreamMetadataMap.put(streamName, new SystemStreamMetadata(streamName, directoryPartitioner |
| .getPartitionMetadataMap(streamName, obtainPartitionDescriptorMap(stagingDirectory, streamName)))); |
| if (!partitionDescriptorExists(streamName)) { |
| persistPartitionDescriptor(streamName, directoryPartitioner.getPartitionDescriptor(streamName)); |
| } |
| }); |
| return systemStreamMetadataMap; |
| } |
| |
| /** |
| * Compare two multi-file style offset. A multi-file style offset consist of both |
| * the file index as well as the offset within that file. And the format of it is: |
| * "fileIndex:offsetWithinFile" |
| * For example, "2:0", "3:127" |
| * Format of the offset within file is defined by the implementation of |
| * {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself. |
| * |
| * @param offset1 First offset for comparison. |
| * @param offset2 Second offset for comparison. |
| * @return -1, if offset1 @lt offset2 |
| * 0, if offset1 == offset2 |
| * 1, if offset1 @gt offset2 |
| * null, if not comparable |
| */ |
| @Override |
| public Integer offsetComparator(String offset1, String offset2) { |
| if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) { |
| return null; |
| } |
| /* |
| * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM, |
| * then they are equal. Otherwise END_OF_STREAM is always greater than any |
| * other offsets. |
| */ |
| if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) { |
| return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1; |
| } |
| if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) { |
| return -1; |
| } |
| int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1); |
| int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2); |
| if (fileIndex1 == fileIndex2) { |
| String offsetWithinFile1 = MultiFileHdfsReader.getCurSingleFileOffset(offset1); |
| String offsetWithinFile2 = MultiFileHdfsReader.getCurSingleFileOffset(offset2); |
| return HdfsReaderFactory.offsetComparator(readerType, offsetWithinFile1, offsetWithinFile2); |
| } |
| return Integer.compare(fileIndex1, fileIndex2); |
| } |
| } |