blob: 49a918657fa078735e3c632f1e7dc22461fdc001 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.hdfs.descriptors;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.system.hdfs.HdfsConfig;
import org.apache.samza.system.hdfs.HdfsSystemFactory;
/**
* A {@link HdfsSystemDescriptor} can be used for specifying Samza and HDFS-specific properties of a HDFS
* input/output system. It can also be used for obtaining {@link HdfsInputDescriptor}s and
* {@link HdfsOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
* HDFS input/output streams.
* <p>
* System properties provided in configuration override corresponding properties specified using a descriptor.
*/
public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> {
private static final String FACTORY_CLASS_NAME = HdfsSystemFactory.class.getName();
private Optional<String> datePathFormat = Optional.empty();
private Optional<String> outputBaseDir = Optional.empty();
private Optional<Long> writeBatchSizeBytes = Optional.empty();
private Optional<Long> writeBatchSizeRecords = Optional.empty();
private Optional<String> writeCompressionType = Optional.empty();
private Optional<String> writerClass = Optional.empty();
private Optional<Long> consumerBufferCapacity = Optional.empty();
private Optional<Long> consumerMaxRetries = Optional.empty();
private Optional<String> consumerWhiteList = Optional.empty();
private Optional<String> consumerBlackList = Optional.empty();
private Optional<String> consumerGroupPattern = Optional.empty();
private Optional<String> consumerReader = Optional.empty();
private Optional<String> consumerStagingDirectory = Optional.empty();
public HdfsSystemDescriptor(String systemName) {
super(systemName, FACTORY_CLASS_NAME, null, null);
}
/**
* Gets an {@link HdfsInputDescriptor} for the input stream of this system.
* <p>
* The message in the stream has no key and the value type is determined by reader type.
*
* @param streamId id of the input stream
* @return an {@link HdfsInputDescriptor} for the hdfs input stream
*/
public HdfsInputDescriptor getInputDescriptor(String streamId) {
return new HdfsInputDescriptor(streamId, this);
}
/**
* Gets an {@link HdfsOutputDescriptor} for the output stream of this system.
* <p>
* The message in the stream has no key and the value type is determined by writer class.
*
* @param streamId id of the output stream
* @return an {@link HdfsOutputDescriptor} for the hdfs output stream
*/
public HdfsOutputDescriptor getOutputDescriptor(String streamId) {
return new HdfsOutputDescriptor(streamId, this);
}
/**
* In an HdfsWriter implementation that performs time-based output bucketing,
* the user may configure a date format (suitable for inclusion in a file path)
* using <code>SimpleDateFormat</code> formatting that the Bucketer implementation will
* use to generate HDFS paths and filenames. The more granular this date format, the more
* often a bucketing HdfsWriter will begin a new date-path bucket when creating the next output file.
* @param datePathFormat date path format
* @return this system descriptor
*/
public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) {
this.datePathFormat = Optional.ofNullable(StringUtils.stripToNull(datePathFormat));
return this;
}
/**
* The base output directory into which all HDFS output for this job will be written.
* @param outputBaseDir output base directory
* @return this system descriptor
*/
public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) {
this.outputBaseDir = Optional.ofNullable(StringUtils.stripToNull(outputBaseDir));
return this;
}
/**
* Split output files from all writer tasks based on # of bytes written to optimize
* MapReduce utilization for Hadoop jobs that will process the data later.
* @param writeBatchSizeBytes write batch size in bytes.
* @return this system descriptor
*/
public HdfsSystemDescriptor withWriteBatchSizeBytes(long writeBatchSizeBytes) {
this.writeBatchSizeBytes = Optional.of(writeBatchSizeBytes);
return this;
}
/**
* Split output files from all writer tasks based on # of bytes written to optimize
* MapReduce utilization for Hadoop jobs that will process the data later.
* @param writeBatchSizeRecords write batch size in records.
* @return this system descriptor
*/
public HdfsSystemDescriptor withWriteBatchSizeRecords(long writeBatchSizeRecords) {
this.writeBatchSizeRecords = Optional.of(writeBatchSizeRecords);
return this;
}
/**
* Simple, human-readable label for various compression options. HdfsWriter implementations
* can choose how to handle these individually, or throw an exception. Example: "none", "gzip", ...
* @param writeCompressionType compression type for writer.
* @return this system descriptor
*/
public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) {
this.writeCompressionType = Optional.ofNullable(StringUtils.stripToNull(writeCompressionType));
return this;
}
/**
* The fully-qualified class name of the HdfsWriter subclass that will write for this system.
* @param writerClassName writer class name.
* @return this system descriptor
*/
public HdfsSystemDescriptor withWriterClassName(String writerClassName) {
this.writerClass = Optional.ofNullable(StringUtils.stripToNull(writerClassName));
return this;
}
/**
* The capacity of the hdfs consumer buffer - the blocking queue used for storing messages.
* @param bufferCapacity the buffer capacity for HDFS consumer.
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerBufferCapacity(long bufferCapacity) {
this.consumerBufferCapacity = Optional.of(bufferCapacity);
return this;
}
/**
* Number of max retries for the hdfs consumer readers per partition.
* @param maxRetries number of max retires for HDFS consumer.
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerNumMaxRetries(long maxRetries) {
this.consumerMaxRetries = Optional.of(maxRetries);
return this;
}
/**
* White list used by directory partitioner to filter out unwanted files in a hdfs directory.
* @param whiteList white list for HDFS consumer inputs.
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) {
this.consumerWhiteList = Optional.ofNullable(StringUtils.stripToNull(whiteList));
return this;
}
/**
* Black list used by directory partitioner to filter out unwanted files in a hdfs directory.
* @param blackList black list for HDFS consumer inputs.
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerBlackList(String blackList) {
this.consumerBlackList = Optional.ofNullable(StringUtils.stripToNull(blackList));
return this;
}
/**
* Group pattern used by directory partitioner for advanced partitioning.
* @param groupPattern group parttern for HDFS consumer inputs.
* @return this system descriptor
*/
public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) {
this.consumerGroupPattern = Optional.ofNullable(StringUtils.stripToNull(groupPattern));
return this;
}
/**
* The type of the file reader for consumer (avro, plain, etc.)
* @param readerType reader type for HDFS consumer inputs.
* @return this system descriptor
*/
public HdfsSystemDescriptor withReaderType(String readerType) {
this.consumerReader = Optional.ofNullable(StringUtils.stripToNull(readerType));
return this;
}
/**
* Staging directory for storing partition description. If not set, will use the staging directory set
* by yarn job.
* @param stagingDirectory staging directory for HDFS consumer inputs.
* @return this system descriptor
*/
public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) {
this.consumerStagingDirectory = Optional.ofNullable(StringUtils.stripToNull(stagingDirectory));
return this;
}
@Override
public Map<String, String> toConfig() {
Map<String, String> config = new HashMap<>(super.toConfig());
String systemName = getSystemName();
datePathFormat.ifPresent(val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val));
outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
writeBatchSizeBytes.ifPresent(
val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val)));
writeBatchSizeRecords.ifPresent(
val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val)));
writeCompressionType.ifPresent(val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val));
writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
consumerBufferCapacity.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val)));
consumerMaxRetries.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val)));
consumerWhiteList.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val));
consumerBlackList.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val));
consumerGroupPattern.ifPresent(
val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val));
consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
consumerStagingDirectory.ifPresent(
val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val));
return config;
}
}