blob: 54296448e0eb526c42c8ba9e4822c704136bd648 [file] [log] [blame]
..
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at
..
.. http://www.apache.org/licenses/LICENSE-2.0
..
.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
..
.. warning:: The documentation is not up-to-date and has moved to `Apache Pinot Docs <https://docs.pinot.apache.org/>`_.
Optimizing Scatter and Gather
=============================
.. contents:: Table of Contents
When the use case has very high qps along with low latency requirements (usually site facing use cases),
we need to consider optimizing the scatter-and-gather.
Below table summarizes the two issues with the default behavior of Pinot.
.. csv-table::
:header: "Problem", "Impact", "Solution"
:widths: 15, 15, 15
"Querying all servers", "Bad tail latency, not scalable", "Control the number of servers to fan out"
"Querying all segments", "More CPU work on server", "Minimize the number of segment"
Querying All Servers
--------------------
By default, Pinot uses ``BalanceNumSegmentAssignmentStrategy`` for segment assignment. This scheme tries to distribute
the number of segments uniformly to all servers. When we perform scatter-and-gather a query request, broker will try
to uniformly distribute the workload among servers by assigning the balanced number of segments to each server. As
a result, each query will span out to all servers under this scheme. It works pretty well when qps is low and you
have small number of servers in the cluster. However, as we add more servers or have more qps, the probability of
hitting slow server (e.g. gc) increases steeply and Pinot will suffer from a long tail latency.
In order to address this issue, we have introduced a concept of ``Replica Group``, which allows us to control the
number of servers to fan out for each query.
Replica Group Segment Assignment and Query Routing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``Replica Group`` is a set of server that contains a 'complete' set of all segments of a table. Once we assign the
segment based on replica group, each query can be answered by fanning out to a replica group instead of all servers.
.. image:: img/replica-group.png
``Replica Group`` based segment assignment can be configured for a table by setting it in the table config. Note that
``ReplicaGroupSegmentAssignmentStrategy`` needs to be used along with ``PartitionAwareOffline`` for routing and this is
currently available for **offline table** only.
.. code-block:: none
{
"segmentsConfig": {
...
"replication": "3",
"segmentAssignmentStrategy": "ReplicaGroupSegmentAssignmentStrategy",
"replicaGroupStrategyConfig": {
"mirrorAssignmentAcrossReplicaGroups": true,
"numInstancesPerPartition": 4
}
}
...
"routing": {
"routingTableBuilderName": "PartitionAwareOffline",
"routingTableBuilderOptions": {}
},
...
}
As seen above, you can use ``replication`` and ``numInstancesPerPartition`` to control the number of servers to span. For
instance, let's say that you have 12 servers in the cluster. Above configuration will generate 3 replica groups (based on
``replication=3``) and each replica group will contain 4 servers (``numInstancesPerPartition=4``). In this example, each
query will span to a single replica group (4 servers).
As you seen above, replica group gives you the control on the number of servers to span for each query. When you try to
decide the proper number of ``replication`` and ``numInstancesPerPartition``, you should consider the trade-off between
throughput and latency. Given a fixed number of servers, increasing ``replication`` factor while decreasing
``numInstancesPerPartition`` will give you more throughput because each server requires to process less number of queries.
However, each server will need to process more number of segments per query, thus increasing overall latency. Similarly,
decreasing ``replication`` while increasing ``numInstancesPerPartition`` will make each server processing more number
of queries but each server needs to process less number of segments per query. So, this number has to be decided based
on the use case requirements.
Querying All Segments
---------------------
By default, Pinot broker will distribute all segments for query processing and segment pruning is happening in Server.
In other words, Server will look at the segment metadata such as min/max time value and discard the segment if it does
not contain any data that the query is asking for. Server side pruning works pretty well when the qps is low; however,
it becomes the bottleneck if qps is very high (hundreds to thousands queries per second) because unnecessary segments
still need to be scheduled for processing and consume cpu resources.
Currently, we have two different mechanisms to prune segments on the broker side to minimize the number of segment for
processing before scatter-and-gather.
Partitioning
^^^^^^^^^^^^
When the data is partitioned on a dimension, each segment will contain all the rows with the same partition value for
a partitioning dimension. In this case, a lot of segments can be pruned if a query requires to look at a single
partition to compute the result. Below diagram gives the example of data partitioned on member id while the query
includes an equality filter on member id.
.. image:: img/partitioning.png
``Partitoning`` can be enabled by setting the following configuration in the table config.
.. code-block:: none
{
"tableIndexConfig": {
"segmentPartitionConfig": {
"columnPartitionMap": {
"memberId": {
"functionName": "modulo",
"numPartitions": 4
}
}
}
}
...
"routing": {
"routingTableBuilderName": "PartitionAwareOffline",
"routingTableBuilderOptions": {}
},
}
Pinot currently supports ``modulo`` and ``murmur`` hash function. After setting the above config, data needs to be partitioned
using the same partition function and the number of partition to partition before running Pinot segment conversion and push job
for offline push. Realtime partitioning depends on the kafka for partitioning. When emitting an event to kafka, a user need to
feed partitioning key and partition function for Kafka producer API.
When applied correctly, partition information should be available in the segment metadata.
.. code-block:: none
$ column.memberId.partitionFunction = Murmur
column.memberId.partitionValues = [9 9]
Note that broker side pruning for partitioning only works with ``PartitionAwareOffline`` and ``PartitionAwareRealtime`` routing
table builder strategies. Also note that the current implementation for partitioning only works for **EQUALITY** filter
(e.g. memberId = xx).
Bloom Filter for Dictionary
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Dictionary encoding provides the array of unique values. Pinot allows to create a bloom filter on this unique values for each
column. Bloom filter can quickly determine whether the value exist in the segment.
Bloom filter can be enabled by setting the following configuration in the table config.
.. code-block:: none
{
"tableIndexConfig": {
"bloomFilterColumns": [
"column_name",
...
],
...
}
}
Our implementation limits the size of bloom filter to be less than 1MB per segment along with max false positive of 5% to
avoid consuming too much memory. We recommend to put bloom filter for the column with ``less than 1 million cardinality``.
Note that the current implementation for bloom filter also works for **EQUALITY** filter only.