blob: 309c8406e760d5d95a62d368053c237febfcf698 [file] [log] [blame]
..
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
..
.. http://www.apache.org/licenses/LICENSE-2.0
..
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
..
.. warning:: The documentation is not up-to-date and has moved to `Apache Pinot Docs <https://docs.pinot.apache.org/>`_.
.. _pluggable-streams:
Pluggable Streams
=================
.. note::
This section is a pre-read if you are planning to develop plug-ins for streams other than Kafka. Pinot supports Kafka out of the box.
Prior to commit `ba9f2d <https://github.com/apache/incubator-pinot/commit/ba9f2ddfc0faa42fadc2cc48df1d77fec6b174fb>`_, Pinot was only able to support consuming
from `Kafka <https://kafka.apache.org/documentation/>`_ stream.
Pinot now enables its users to write plug-ins to consume from pub-sub streams
other than Kafka. (Please refer to `Issue #2583 <https://github.com/apache/incubator-pinot/issues/2583>`_)
Some of the streams for which plug-ins can be added are:
* `Amazon kinesis <https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl.html>`_
* `Azure Event Hubs <https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-receive-eph>`_
* `LogDevice <https://code.fb.com/core-data/logdevice-a-distributed-data-store-for-logs/>`_
* `Pravega <http://pravega.io/docs/latest/javadoc/>`_
* `Pulsar <https://pulsar.apache.org/docs/en/client-libraries-java/>`_
You may encounter some limitations either in Pinot or in the stream system while developing plug-ins.
Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out.
We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.
Refer to `Consuming and Indexing rows in Realtime <https://cwiki.apache.org/confluence/display/PINOT/Consuming+and+Indexing+rows+in+Realtime>`_
for details on how Pinot consumes streaming data.
Requirements to support Stream Level (High Level) consumers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The stream should provide the following guarantees:
* Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.
* (Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.
* Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.
* The checkpoints should be recorded only when Pinot makes a call to do so.
* The consumer should be able to start consumption from one of:
* latest avaialble data
* earliest available data
* last saved checkpoint
Requirements to support Partition Level (Low Level) consumers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
While consuming rows at a partition level, the stream should support the following
properties:
* Stream should provide a mechanism to get the current number of partitions.
* Each event in a partition should have a unique offset that is not more than 64 bits long.
* Refer to a partition as a number not exceeding 32 bits long.
* Stream should provide the following mechanisms to get an offset for a given partition of the stream:
* get the offset of the oldest event available (assuming events are aged out periodically) in the partition.
* get the offset of the most recent event published in the partition
* (optionally) get the offset of an event that was published at a specified time
* Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.
* Pinot assumes that the offsets of incoming events are monotonically increasing; *i.e.*, if Pinot
consumes an event at offset ``o1``, then the offset ``o2`` of the following event should be such that
``o2 > o1``.
In addition, we have an operational requirement that the number of partitions should not be
reduced over time.
Stream plug-in implementation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In order to add a new type of stream (say,Foo) implement the following classes:
#. FooConsumerFactory extends `StreamConsumerFactory <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConsumerFactory.java>`_
#. FooPartitionLevelConsumer implements `PartitionLevelConsumer <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/PartitionLevelConsumer.java>`_
#. FooStreamLevelConsumer implements `StreamLevelConsumer <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java>`_
#. FooMetadataProvider implements `StreamMetadataProvider <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMetadataProvider.java>`_
#. FooMessageDecoder implements `StreamMessageDecoder <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java>`_
Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.
The properties for the stream implementation are to be set in the table configuration, inside `streamConfigs <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java>`_ section.
Use the ``streamType`` property to define the stream type. For example, for the implementation of stream ``foo``, set the property ``"streamType" : "foo"``.
The rest of the configuration properties for your stream should be set with the prefix ``"stream.foo"``. Be sure to use the same suffix for: (see examples below):
* topic
* consumer type
* stream consumer factory
* offset
* decoder class name
* decoder properties
* connnection timeout
* fetch timeout
All values should be strings. For example:
.. code-block:: none
"streamType" : "foo",
"stream.foo.topic.name" : "SomeTopic",
"stream.foo.consumer.type": "LowLevel",
"stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
"stream.foo.consumer.prop.auto.offset.reset": "largest",
"stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
"stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
"stream.foo.connection.timeout.millis" : "10000", // default 30_000
"stream.foo.fetch.timeout.millis" : "10000" // default 5_000
You can have additional properties that are specific to your stream. For example:
.. code-block:: none
"stream.foo.some.buffer.size" : "24g"
In addition to these properties, you can define thresholds for the consuming segments:
* rows threshold
* time threshold
The properties for the thresholds are as follows:
.. code-block:: none
"realtime.segment.flush.threshold.size" : "100000"
"realtime.segment.flush.threshold.time" : "6h"
An example of this implementation can be found in the `KafkaConsumerFactory <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConsumerFactory.java>`_, which is an implementation for the kafka stream.
Kafka 2.x Plugin
^^^^^^^^^^^^^^^^
Pinot provides stream plugin support for Kafka 2.x version.
Although the version used in this implementation is kafka 2.0.0, it's possible to compile it with higher kafka lib version, e.g. 2.1.1.
How to build and release Pinot package with Kafka 2.x connector
---------------------------------------------------------------
.. code-block:: none
mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0
How to use Kafka 2.x connector
------------------------------
- **Use Kafka Stream(High) Level Consumer**
Below is a sample ``streamConfigs`` used to create a realtime table with Kafka Stream(High) level consumer.
Kafka 2.x HLC consumer uses ``org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory`` in config ``stream.kafka.consumer.factory.class.name``.
.. code-block:: none
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "highLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.hlc.bootstrap.server": "localhost:19092"
}
- **Use Kafka Partition(Low) Level Consumer**
Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:
.. code-block:: none
{
"tableName": "meetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
},
"metadata": {
"customConfigs": {}
}
}
Please note:
1. Config ``replicasPerPartition`` under ``segmentsConfig`` is required to specify table replication.
#. Config ``stream.kafka.consumer.type`` should be specified as ``LowLevel`` to use partition level consumer. (The use of ``simple`` instead of ``LowLevel`` is deprecated)
#. Configs ``stream.kafka.zk.broker.url`` and ``stream.kafka.broker.list`` are required under ``tableIndexConfig.streamConfigs`` to provide kafka related information.
Here is another example which is used to read avro messages using the schema id from confluent schema registry.
The only difference between this and the KafkaAvroMessageDecoder is that the KafkaConfluentSchemaRegistryAvroMessageDecoder uses
confluent schema registry:
.. code-block:: none
{
"tableName": "meetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
},
"metadata": {
"customConfigs": {}
}
}
Here is another example which uses SSL based authentication to talk with kafka
and schema-registry. Notice there are two sets of SSL options, ones starting with
`ssl.` are for kafka consumer and ones with `stream.kafka.decoder.prop.schema.registry.`
are for `SchemaRegistryClient` used by `KafkaConfluentSchemaRegistryAvroMessageDecoder`.
.. code-block:: none
{
"tableName": "meetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092",
"schema.registry.url": "",
"security.protocol": "",
"ssl.truststore.location": "",
"ssl.keystore.location": "",
"ssl.truststore.password": "",
"ssl.keystore.password": "",
"ssl.key.password": "",
"stream.kafka.decoder.prop.schema.registry.rest.url": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.location": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.location": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.keystore.type": "",
"stream.kafka.decoder.prop.schema.registry.ssl.truststore.type": "",
"stream.kafka.decoder.prop.schema.registry.ssl.key.password": "",
"stream.kafka.decoder.prop.schema.registry.ssl.protocol": "",
}
},
"metadata": {
"customConfigs": {}
}
}
Upgrade from Kafka 0.9 connector to Kafka 2.x connector
-------------------------------------------------------
* Update table config for both high level and low level consumer:
Update config: ``stream.kafka.consumer.factory.class.name`` from ``org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory`` to ``org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory``.
* If using Stream(High) level consumer:
Please also add config ``stream.kafka.hlc.bootstrap.server`` into ``tableIndexConfig.streamConfigs``.
This config should be the URI of Kafka broker lists, e.g. ``localhost:9092``.
How to use this plugin with higher Kafka version?
-------------------------------------------------
This connector is also suitable for Kafka lib version higher than ``2.0.0``.
In ``pinot-connector-kafka-2.0/pom.xml`` change the ``kafka.lib.version`` from ``2.0.0`` to ``2.1.1`` will make this Connector working with Kafka ``2.1.1``.