This operator(AbstractFTPInputOperator
) is designed to scan a directory from an FTP server for files, read and split file content into tuples such as lines or blocks of bytes, and finally emit them on the output port for further processing by downstream operators. The operator extends the AbstractFileInputOperator
. It overrides the getFSInstance() method and returns an instance of the FTPFileSystem (org.apache.hadoop.fs.ftp.FTPFileSystem
)
Because this is an input operator, there are no input ports.
Port | Description | Type | Mandatory |
---|---|---|---|
output | output port on which data is emitted | String | Yes |
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
host | the hostname of the FTP Server | String | Yes | N/A |
source | the directory path from where to scan and read files | String | Yes | N/A |
username | the username for authenticating against the FTP server. This is an optional property and can be skipped when anonymous FTP is enabled | String | Yes | N/A |
password | the password to be used in conjunction with the above username | String | Yes | N/A |
Configure parameter partitionCount
to define the desired number of initial partitions (4 in this example).
<property> <name>dt.operator.{OperatorName}.prop.partitionCount</name> <value>4</value> </property>
Alternatively, this can be changed in the application code by setting the operator property partitionCount
to the desired number of partitions:
FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator()); reader.setPartitionCount(4);
Dynamic partitioning -- changing the number of partitions of one or more operators in a running application -- can be achieved in multiple ways:
apex
or the UI console to change the value of the partitionCount
property of the running operator. This change is detected in processStats()
(which is invoked periodically by the platform) where, if the current partition count (currentPartitions
) and the desired partition count (partitionCount
) differ, the repartitionRequired
flag in the response is set. This causes the platform to invoke definePartitions()
to create a new set of partitions with the desired count.processStats()
and within it, based on the statistics in the incoming parameter or any other factors, define a new desired value of partitionCount
and finally, if this value differs from the current partition count, set the repartitionRequired
flag in the response.An example application for the FTP input operator can be found at https://github.com/apache/apex-malhar/tree/master/examples/ftp