This operator scans JDBC database table in parallel fashion. This operator is added to address common input operator problems like,
As discussed in Development Best Practices, the operator callbacks such as beginWindow()
, endWindow()
, emitTuples()
, etc. (which are invoked by the main operator thread) are required to return quickly, well within the default streaming window duration of 500ms. This requirement can be an issue when retrieving data from slow external systems such as databases or object stores: if the call takes too long, the platform will deem the operator blocked and restart it. Restarting will often run into the same issue causing an unbroken sequence of restarts.
When a large volume of data is available from a single store that allows reading from arbitrary locations (such as a file or a database table), reading the data sequentially can be throughput limiting: Having multiple readers read from non-overlapping sections of the store allows any downstream parallelism in the DAG to be exploited better to enhance throughput. For files, this approach is used by the file splitter and block reader operators in the Malhar library.
JDBC Poller Input operator addresses the first issue with an asynchronous worker thread which retrieves the data and adds it to an in-memory queue; the main operator thread dequeue tuples very quickly if data is available or simply returns if not. The second is addressed in a way that parallels the approach to files by having multiple partitions read records from non-overlapping areas of the table. Additional details of how this is done are described below.
Assumption is that there is an ordered column using which range queries can be formed. That means database has a column or combination of columns which has unique constraint as well as every newly inserted record should have column value more than max value in that column, as we poll only appended records.
The tuple type in the abstract class is a generic parameter. Concrete subclasses need to choose an appropriate class (such as String or an appropriate concrete java class, having no-argument constructor so that it can be serialized using Kryo). Also implement a couple of abstract methods: getTuple(ResultSet)
to convert database rows to objects of concrete class and emitTuple(T)
to emit the tuple.
In principle, no ports need be defined in the rare case that the operator simply writes tuples to some external sink or merely maintains aggregated statistics. But in most common scenarios, the tuples need to be sent to one or more downstream operators for additional processing such as parsing, enrichment or aggregation; in such cases, appropriate output ports are defined and the emitTuple(T) implementation dispatches tuples to the desired output ports.
Couple of concrete implementations are provided in Malhar:
JdbcPOJOPollInputOperator: It uses java Object for the generic parameter. This operator defines a single output port and processes each database table record one by one as a tuple object. You need to set the output port attribute TUPLE_CLASS to define your POJO class name to define Object type. The record fetched from the database table will be parsed, using the getTuple
method mentioned above, as an object of the configured class. Details are discussed below.
JdbcPollInputOperator: It uses String for the generic parameter. This operator defines a single port and processes each database table record one by one as String tuple. Details are discussed below.
Only static partitioning is supported for JDBC Poller Input Operator. Configure parameter partitionCount
to define the desired number of initial partitions (4 in this example). Note: An additional partition will be created to poll newly added records, so the total number of partitions will always be 1 + partitionCount.
<property> <name>apex.operator.{OperatorName}.prop.partitionCount</name> <value>4</value> </property>
where {OperatorName} is the name of the JDBC Poller operator.
This will create 5 operator instances in all. Four of these will fetch the data which is currently in the table. We call these static non-polling partitions. The partitions will be idle after they fetch the portion of the data. An additional partition will be created which will read any newly added data. We call such a partition as a polling partition, as it “polls” for newly added data. There will be only one polling partition.
Not supported.
JDBC Poller is idempotent, fault-tolerant and statically partitionable.
This is the abstract implementation that serves as base class for polling messages from JDBC store. It can be extended to modify functionality or add new capabilities. This class doesn’t have any ports, so concrete subclasses will need to provide them if necessary.
###Properties of AbstractJdbcPollInputOperator Several properties are available to configure the behavior of this operator and they are summarized in the table below.
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
store | JDBC Store for connection | JDBCStore | Yes | N/A |
tableName | table name to be scanned | String | Yes | N/A |
columnsExpression | Comma separated list of columns to select from the given table. | String | Yes | N/A |
key | Primary key column name | String | Yes | N/A |
partitionCount | Static partitions count | int | No | 1 |
whereCondition | Where condition for JDBC query | String | No | N/A |
fetchSize | Hint limiting the number of rows to fetch in a single call | int | No | 20000 |
pollInterval | Interval in milliseconds to poll the database table | int | No | 10000 |
queueCapacity | Capacity of queue which holds DB data before emiting | int | No | 4096 |
batchSize | Maximum number of tuples to emit in a single call to the emitTuples() callback (see explanation above). | int | No | 2000 |
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
databaseDriver | JDBC Driver class for connection to JDBC Store. This driver should be there in class path | String | Yes | N/A |
databaseUrl | Database url of the form jdbc:subprotocol:subname | String | Yes | N/A |
connectionProps | Comma separated connection properties e.g. user:xyz,password:ijk | String | Yes | N/A |
Of these only store
properties, tableName
, columnsExpression
and key
are mandatory. Those properties can be set like this:
<property> <name>apex.operator.{OperatorName}.prop.tableName</name> <value>mytable</value> </property> <property> <name>apex.operator.{OperatorName}.prop.columnsExpression</name> <value>column1,column2,column4</value> </property> <property> <name>apex.operator.{OperatorName}.prop.key</name> <value>keycolumn</value> </property> <property> <name>apex.operator.{OperatorName}.prop.store.databaseDriver</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>apex.operator.{OperatorName}.prop.store.databaseUrl</name> <value>jdbc:mysql://localhost:3306/mydb</value> </property> <property> <name>apex.operator.{OperatorName}.prop.store.connectionProps</name> <value>user:myuser,password:mypassword</value> </property>
whereCondition
which will be added to the generated SQL query.partitionsCount
to a higher number to increase read parallelism.fetchSize
as a hint to the database driver to restrict number of rows to fetch in one call. The remaining rows will be fetched in subsequent calls. Please note, some of the database drivers may not honor this hint. Please refer to database driver documentation to know recommended value.emitTuples()
callback multiple time in each streaming window; within a single such call, if a large number of tuples are emitted, there is some risk that they may overwhelm the downstream operators especially if they are performing some compute intensive operation. For such cases, output can be throttled by reducing the value of the batchSize
property. Conversely, if the downstream operators can handle the load, increase the value to enhance throughput.pollInterval
; if they appear rarely or if some delay in processing new records is acceptable, increase it.queueCapacity
. You can use larger size of queue when your reader thread is very fast and you want to read more data in memory to keep it ready for emission.Note: Please set right store object instance to JDBC Poller Input Operator using your application code. It's recommended to use JdbcStore for this operator.
void emitTuple(T tuple)
: Abstract method that emits tuple extracted from JDBC store.
T getTuple(ResultSet result)
: Abstract method to extract the tuple from the JDBC ResultSet object and convert it to the required type (T).
This implementation converts JDBC store records to POJO and emits POJO on output port.
This operator defines following additional properties beyond those defined in the parent class.
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
fieldInfos | Maps columns to POJO field names. | List | Yes | N/A |
Attribute | Description | Type | Mandatory |
---|---|---|---|
outputPort.TUPLE_CLASS | TUPLE_CLASS attribute on output port which tells operator the class of POJO which need to be emitted | Class or FQCN (Fully Qualified Class Name) | Yes |
Port | Description | Type | Mandatory |
---|---|---|---|
outputPort | Tuples that are read from JDBC store are emitted from on this port | Object (POJO) | No |
This implementation converts JDBC store records to comma separated CSV records. This operator is normally used when you just want to copy the data from database to somewhere else and don't want to do much of processing.
This operator defines no additional properties beyond those defined in the parent class.
Port | Description | Type | Mandatory |
---|---|---|---|
outputPort | Tuples that are read from JDBC store are emitted on this port | String | No |
Out of order insertion/deletion won't be supported.