| --- |
| title: "Query Performance" |
| weight: 3 |
| type: docs |
| aliases: |
| - /append-table/query-performance.html |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Query Performance |
| |
| ## Aggregate push down |
| |
| Append Table supports aggregate push down: |
| |
| ```sql |
| SELECT COUNT(*) FROM TABLE WHERE DT = '20230101'; |
| ``` |
| |
| This query can be accelerated during compilation and returns very quickly. |
| |
| For Spark SQL, table with default `metadata.stats-mode` can be accelerated: |
| |
| ```sql |
| SELECT MIN(a), MAX(b) FROM TABLE WHERE DT = '20230101'; |
| |
| SELECT * FROM TABLE ORDER BY a LIMIT 1; |
| ``` |
| |
| Min max topN query can be also accelerated during compilation and returns very quickly. |
| |
| ## Data Skipping By Order |
| |
| Paimon by default records the maximum and minimum values of each field in the manifest file. |
| |
| In the query, according to the `WHERE` condition of the query, together with the statistics in the manifest we can |
| perform file filtering. If the filtering effect is good, the query that would have cost minutes will be accelerated to |
| milliseconds to complete the execution. |
| |
| Often the data distribution is not always ideal for filtering, so can we sort the data by the field in `WHERE` condition? |
| You can take a look at [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}), |
| [Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}). |
| |
| ## Data Skipping By File Index |
| |
| You can use file index too, it filters files by indexing on the reading side. |
| |
| Define `file-index.bitmap.columns`, Data file index is an external index file and Paimon will create its |
| corresponding index file for each file. If the index file is too small, it will be stored directly in the manifest, |
| otherwise in the directory of the data file. Each data file corresponds to an index file, which has a separate file |
| definition and can contain different types of indexes with multiple columns. |
| |
| Different file indexes may be efficient in different scenarios. For example bloom filter may speed up query in point lookup |
| scenario. Using a bitmap may consume more space but can result in greater accuracy. |
| |
| * [BloomFilter]({{< ref "concepts/spec/fileindex#index-bloomfilter" >}}): `file-index.bloom-filter.columns`. |
| * [Bitmap]({{< ref "concepts/spec/fileindex#index-bitmap" >}}): `file-index.bitmap.columns`. |
| * [Range Bitmap]({{< ref "concepts/spec/fileindex#index-range-bitmap" >}}): `file-index.range-bitmap.columns`. |
| |
| If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before |
| we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config |
| `file-index.<filter-type>.columns` to the table. |
| |
| How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" >}}) |
| |
| ## Bucketed Join |
| |
| Bucketed table can be used to avoid shuffle if necessary in batch query, for example, you can use the following Spark |
| SQL to read a Paimon table: |
| |
| ```sql |
| SET spark.sql.sources.v2.bucketing.enabled = true; |
| |
| CREATE TABLE FACT_TABLE (order_id INT, f1 STRING) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id'); |
| |
| CREATE TABLE DIM_TABLE (order_id INT, f2 STRING) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id'); |
| |
| SELECT * FROM FACT_TABLE JOIN DIM_TABLE on t1.order_id = t4.order_id; |
| ``` |
| |
| The `spark.sql.sources.v2.bucketing.enabled` config is used to enable bucketing for V2 data sources. When turned on, |
| Spark will recognize the specific distribution reported by a V2 data source through SupportsReportPartitioning, and |
| will try to avoid shuffle if necessary. |
| |
| The costly join shuffle will be avoided if two tables have the same bucketing strategy and same number of buckets. |