blob: 0c1e0a6c60b4317d28cb2a88125f9c966cc6bfb0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
= Apache Kudu Schema Design
:author: Kudu Team
:imagesdir: ./images
:icons: font
:toc: left
:toclevels: 3
:doctype: book
:backend: html5
:sectlinks:
:experimental:
[[schema_design]]
== Overview
Kudu tables have a structured data model similar to tables in a traditional
RDBMS. Schema design is critical for achieving the best performance and
operational stability from Kudu. Every workload is unique, and there is no
single schema design that is best for every table. This document outlines
effective schema design philosophies for Kudu, paying particular attention to
where they differ from approaches used for traditional RDBMS schemas.
At a high level, there are three concerns when creating Kudu tables:
<<column-design,column design>>, <<primary-keys,primary key design>>, and
<<partitioning,partitioning design>>. Of these, only partitioning will be a new
concept for those familiar with traditional non-distributed relational
databases. The final sections discuss <<alter-schema,altering the schema>> of an
existing table, and <<known-limitations,known limitations>> with regard to
schema design.
== The Perfect Schema
The perfect schema would accomplish the following:
- Data would be distributed in such a way that reads and writes are spread
evenly across tablet servers. This is impacted by partitioning.
- Tablets would grow at an even, predictable rate and load across tablets would
remain steady over time. This is most impacted by partitioning.
- Scans would read the minimum amount of data necessary to fulfill a query. This
is impacted mostly by primary key design, but partitioning also plays a role
via partition pruning.
The perfect schema depends on the characteristics of your data, what you need to do
with it, and the topology of your cluster. Schema design is the single most important
thing within your control to maximize the performance of your Kudu cluster.
[[column-design]]
== Column Design
A Kudu Table consists of one or more columns, each with a defined type. Columns
that are not part of the primary key may be nullable. Supported
column types include:
* boolean
* 8-bit signed integer
* 16-bit signed integer
* 32-bit signed integer
* 64-bit signed integer
* date (32-bit days since the Unix epoch)
* unixtime_micros (64-bit microseconds since the Unix epoch)
* single-precision (32-bit) IEEE-754 floating-point number
* double-precision (64-bit) IEEE-754 floating-point number
* decimal (see <<decimal>> for details)
* varchar (see <<varchar>> for details)
* UTF-8 encoded string (up to 64KB uncompressed)
* binary (up to 64KB uncompressed)
Kudu takes advantage of strongly-typed columns and a columnar on-disk storage
format to provide efficient encoding and serialization. To make the most of
these features, columns should be specified as the appropriate type, rather than
simulating a 'schemaless' table using string or binary columns for data which
may otherwise be structured. In addition to encoding, Kudu allows compression to
be specified on a per-column basis.
[[no_version_column]]
[IMPORTANT]
.No Version or Timestamp Column
Kudu does not provide a version or timestamp column to track changes to a row.
If version or timestamp information is needed, the schema should include an
explicit version or timestamp column.
[[decimal]]
=== Decimal Type
The `decimal` type is a numeric data type with fixed scale and precision suitable for
financial and other arithmetic calculations where the imprecise representation and
rounding behavior of `float` and `double` make those types impractical. The `decimal`
type is also useful for integers larger than int64 and cases with fractional values
in a primary key.
The `decimal` type is a parameterized type that takes precision and scale type
attributes.
*Precision* represents the total number of digits that can be represented by the
column, regardless of the location of the decimal point. This value must be between
1 and 38 and has no default. For example, a precision of 4 is required to represent
integer values up to 9999, or to represent values up to 99.99 with two fractional
digits. You can also represent corresponding negative values, without any
change in the precision. For example, the range -9999 to 9999 still only requires
a precision of 4.
*Scale* represents the number of fractional digits. This value must be between 0
and the precision. A scale of 0 produces integral values, with no fractional part.
If precision and scale are equal, all of the digits come after the decimal point.
For example, a decimal with precision and scale equal to 3 can represent values
between -0.999 and 0.999.
*Performance considerations:*
Kudu stores each value in as few bytes as possible depending on the precision
specified for the decimal column. For that reason it is not advised to just use
the highest precision possible for convenience. Doing so could negatively impact
performance, memory and storage.
Before encoding and compression:
* Decimal values with precision of 9 or less are stored in 4 bytes.
* Decimal values with precision of 10 through 18 are stored in 8 bytes.
* Decimal values with precision greater than 18 are stored in 16 bytes.
NOTE: The precision and scale of `decimal` columns cannot be changed by altering
the table.
[[varchar]]
=== Varchar Type
The `varchar` type is a UTF-8 encoded string (up to 64KB uncompressed) with a
fixed maximum character length. This type is especially useful when migrating
from or integrating with legacy systems that support the `varchar` type.
If a maximum character length is not required the `string` type should be
used instead.
The `varchar` type is a parameterized type that takes a length attribute.
*Length* represents the maximum number of UTF-8 characters allowed. Values
with characters greater than the limit will be truncated. This value must
be between 1 and 65535 and has no default. Note that some other systems
may represent the length limit in bytes instead of characters. That means
that Kudu may be able to represent longer values in the case of multi-byte
UTF-8 characters.
[[encoding]]
=== Column Encoding
Each column in a Kudu table can be created with an encoding, based on the type
of the column.
.Encoding Types
[options="header"]
|===
| Column Type | Encoding | Default
| int8, int16, int32, int64 | plain, bitshuffle, run length | bitshuffle
| date, unixtime_micros | plain, bitshuffle, run length | bitshuffle
| float, double, decimal | plain, bitshuffle | bitshuffle
| bool | plain, run length | run length
| string, varchar, binary | plain, prefix, dictionary | dictionary
|===
[[plain]]
Plain Encoding:: Data is stored in its natural format. For example, `int32`
values are stored as fixed-size 32-bit little-endian integers.
[[bitshuffle]]
Bitshuffle Encoding:: A block of values is rearranged to store the most
significant bit of every value, followed by the second most significant bit of
every value, and so on. Finally, the result is LZ4 compressed. Bitshuffle
encoding is a good choice for columns that have many repeated values, or values
that change by small amounts when sorted by primary key. The
https://github.com/kiyo-masui/bitshuffle[bitshuffle] project has a good overview
of performance and use cases.
[[run-length]]
Run Length Encoding:: _Runs_ (consecutive repeated values) are compressed in a
column by storing only the value and the count. Run length encoding is effective
for columns with many consecutive repeated values when sorted by primary key.
[[dictionary]]
Dictionary Encoding:: A dictionary of unique values is built, and each column
value is encoded as its corresponding index in the dictionary. Dictionary
encoding is effective for columns with low cardinality. If the column values of
a given row set are unable to be compressed because the number of unique values
is too high, Kudu will transparently fall back to plain encoding for that row
set. This is evaluated during flush.
[[prefix]]
Prefix Encoding:: Common prefixes are compressed in consecutive column values.
Prefix encoding can be effective for values that share common prefixes, or the
first column of the primary key, since rows are sorted by primary key within
tablets.
[[compression]]
=== Column Compression
Kudu allows per-column compression using the `LZ4`, `Snappy`, or `zlib`
compression codecs. By default, columns that are Bitshuffle-encoded are
inherently compressed with LZ4 compression. Otherwise, columns are stored
uncompressed. Consider using compression if reducing storage space is more
important than raw scan performance.
Every data set will compress differently, but in general LZ4 is the most
performant codec, while `zlib` will compress to the smallest data sizes.
Bitshuffle-encoded columns are automatically compressed using LZ4, so it is not
recommended to apply additional compression on top of this encoding.
[[primary-keys]]
== Primary Key Design
Every Kudu table must declare a primary key comprised of one or more columns.
Like an RDBMS primary key, the Kudu primary key enforces a uniqueness constraint.
Attempting to insert a row with the same primary key values as an existing row
will result in a duplicate key error.
Primary key columns must be non-nullable, and may not be a boolean, float
or double type.
Once set during table creation, the set of columns in the primary key may not
be altered.
Unlike an RDBMS, Kudu does not provide an auto-incrementing column feature,
so the application must always provide the full primary key during insert.
Row delete and update operations must also specify the full primary key of the
row to be changed. Kudu does not natively support range deletes or updates.
The primary key values of a column may not be updated after the row is inserted.
However, the row may be deleted and re-inserted with the updated value.
[[indexing]]
=== Primary Key Index
As with many traditional relational databases, Kudu’s primary key is in a
clustered index. All rows within a tablet are sorted by its primary key.
When scanning Kudu rows, use equality or range predicates on primary key
columns to efficiently find the rows.
NOTE: Primary key indexing optimizations apply to scans on individual tablets.
See the <<partition-pruning>> section for details on how scans can use
predicates to skip entire tablets.
[[Backfilling]]
=== Considerations for Backfill Inserts
This section discuss a primary key design consideration for timeseries use
cases where the primary key is a timestamp, or the first column of the primary
key is a timestamp.
Each time a row is inserted into a Kudu table, Kudu looks up the primary key in
the primary key index storage to check whether that primary key is already
present in the table. If the primary key exists in the table, a "duplicate key"
error is returned. In the typical case where data is being inserted at
the current time as it arrives from the data source, only a small range of
primary keys are "hot". So, each of these "check for presence" operations is
very fast. It hits the cached primary key storage in memory and doesn't require
going to disk.
In the case when you load historical data, which is called "backfilling", from
an offline data source, each row that is inserted is likely to hit a cold area
of the primary key index which is not resident in memory and will cause one or
more HDD disk seeks. For example, in a normal ingestion case where Kudu sustains
a few million inserts per second, the "backfill" use case might sustain only
a few thousand inserts per second.
To alleviate the performance issue during backfilling, consider the following
options:
* Make the primary keys more compressible.
+
For example, with the first column of a primary key being a random ID of 32-bytes,
caching one billion primary keys would require at least 32 GB of RAM to stay in
cache. If caching backfill primary keys from several days ago, you need to have
several times 32 GB of memory. By changing the primary key to be more compressible,
you increase the likelihood that the primary keys can fit in cache and thus
reducing the amount of random disk I/Os.
+
* Use SSDs for storage as random seeks are orders of magnitude faster than spinning disks.
* Change the primary key structure such that the backfill writes hit a continuous range of primary keys.
[[partitioning]]
== Partitioning
In order to provide scalability, Kudu tables are partitioned into units called
tablets, and distributed across many tablet servers. A row always belongs to a
single tablet. The method of assigning rows to tablets is determined by the
partitioning of the table, which is set during table creation.
Choosing a partitioning strategy requires understanding the data model and the
expected workload of a table. For write-heavy workloads, it is important to
design the partitioning such that writes are spread across tablets in order to
avoid overloading a single tablet. For workloads involving many short scans,
where the overhead of contacting remote servers dominates, performance can be
improved if all of the data for the scan is located in the same tablet.
Understanding these fundamental trade-offs is central to designing an effective
partition schema.
[[no_default_partitioning]]
[IMPORTANT]
.No Default Partitioning
Kudu does not provide a default partitioning strategy when creating tables. It
is recommended that new tables which are expected to have heavy read and write
workloads have at least as many tablets as tablet servers.
Kudu provides two types of partitioning: <<range-partitioning,range
partitioning>> and <<hash-partitioning,hash partitioning>>. Tables may also have
<<multilevel-partitioning,multilevel partitioning>>, which combines range and hash
partitioning, or multiple instances of hash partitioning.
[[range-partitioning]]
=== Range Partitioning
Range partitioning distributes rows using a totally-ordered range partition key.
Each partition is assigned a contiguous segment of the range partition keyspace.
The key must be comprised of a subset of the primary key columns. If the range
partition columns match the primary key columns, then the range partition key of
a row will equal its primary key. In range partitioned tables without hash
partitioning, each range partition will correspond to exactly one tablet.
The initial set of range partitions is specified during table creation as a set
of partition bounds and split rows. For each bound, a range partition will be
created in the table. Each split will divide a range partition in two. If no
partition bounds are specified, then the table will default to a single
partition covering the entire key space (unbounded below and above). Range
partitions must always be non-overlapping, and split rows must fall within a
range partition.
NOTE: see the <<range-partitioning-example>> for further discussion of range
partitioning.
[[range-partition-management]]
==== Range Partition Management
Kudu allows range partitions to be dynamically added and removed from a table at
runtime, without affecting the availability of other partitions. Removing a
partition will delete the tablets belonging to the partition, as well as the
data contained in them. Subsequent inserts into the dropped partition will fail.
New partitions can be added, but they must not overlap with any existing range
partitions. Kudu allows dropping and adding any number of range partitions in a
single transactional alter table operation.
Dynamically adding and dropping range partitions is particularly useful for time
series use cases. As time goes on, range partitions can be added to cover
upcoming time ranges. For example, a table storing an event log could add a
month-wide partition just before the start of each month in order to hold the
upcoming events. Old range partitions can be dropped in order to efficiently
remove historical data, as necessary.
[[hash-partitioning]]
=== Hash Partitioning
Hash partitioning distributes rows by hash value into one of many buckets. In
single-level hash partitioned tables, each bucket will correspond to exactly
one tablet. The number of buckets is set during table creation. Typically the
primary key columns are used as the columns to hash, but as with range
partitioning, any subset of the primary key columns can be used.
Hash partitioning is an effective strategy when ordered access to the table is
not needed. Hash partitioning is effective for spreading writes randomly among
tablets, which helps mitigate hot-spotting and uneven tablet sizes.
NOTE: see the <<hash-partitioning-example>> for further discussion of hash
partitioning.
[[multilevel-partitioning]]
=== Multilevel Partitioning
Kudu allows a table to combine multiple levels of partitioning on a single
table. Zero or more hash partition levels can be combined with an optional range
partition level. The only additional constraint on multilevel partitioning
beyond the constraints of the individual partition types, is that multiple levels
of hash partitions must not hash the same columns.
When used correctly, multilevel partitioning can retain the benefits of the
individual partitioning types, while reducing the downsides of each. The total
number of tablets in a multilevel partitioned table is the product of the
number of partitions in each level.
NOTE: see the <<hash-range-partitioning-example>> and the
<<hash-hash-partitioning-example>> for further discussion of multilevel
partitioning.
[[partition-pruning]]
=== Partition Pruning
Kudu scans will automatically skip scanning entire partitions when it can be
determined that the partition can be entirely filtered by the scan predicates.
To prune hash partitions, the scan must include equality predicates on every
hashed column. To prune range partitions, the scan must include equality or
range predicates on the range partitioned columns. Scans on multilevel
partitioned tables can take advantage of partition pruning on any of the levels
independently.
[[partitioning-examples]]
=== Partitioning Examples
To illustrate the factors and trade-offs associated with designing a partitioning
strategy for a table, we will walk through some different partitioning
scenarios. Consider the following table schema for storing machine metrics data
(using SQL syntax and date-formatted timestamps for clarity):
[source,sql]
----
CREATE TABLE metrics (
host STRING NOT NULL,
metric STRING NOT NULL,
time INT64 NOT NULL,
value DOUBLE NOT NULL,
PRIMARY KEY (host, metric, time)
);
----
[[range-partitioning-example]]
==== Range Partitioning Example
A natural way to partition the `metrics` table is to range partition on the
`time` column. Let's assume that we want to have a partition per year, and the
table will hold data for 2014, 2015, and 2016. There are at least two ways that
the table could be partitioned: with unbounded range partitions, or with bounded
range partitions.
image::range-partitioning-example.png[Range Partitioning by `time`]
The image above shows the two ways the `metrics` table can be range partitioned
on the `time` column. In the first example (in blue), the default range
partition bounds are used, with splits at `2015-01-01` and `2016-01-01`. This
results in three tablets: the first containing values before 2015, the second
containing values in the year 2015, and the third containing values after 2016.
The second example (in green) uses a range partition bound of `[(2014-01-01),
(2017-01-01)]`, and splits at `2015-01-01` and `2016-01-01`. The second example
could have equivalently been expressed through range partition bounds of
`[(2014-01-01), (2015-01-01)]`, `[(2015-01-01), (2016-01-01)]`, and
`[(2016-01-01), (2017-01-01)]`, with no splits. The first example has unbounded
lower and upper range partitions, while the second example includes bounds.
Each of the range partition examples above allows time-bounded scans to prune
partitions falling outside of the scan's time bound. This can greatly improve
performance when there are many partitions. When writing, both examples suffer
from potential hot-spotting issues. Because metrics tend to always be written
at the current time, most writes will go into a single range partition.
The second example is more flexible than the first, because it allows range
partitions for future years to be added to the table. In the first example, all
writes for times after `2016-01-01` will fall into the last partition, so the
partition may eventually become too large for a single tablet server to handle.
[[hash-partitioning-example]]
==== Hash Partitioning Example
Another way of partitioning the `metrics` table is to hash partition on the
`host` and `metric` columns.
image::hash-partitioning-example.png[Hash Partitioning by `host` and `metric`]
In the example above, the `metrics` table is hash partitioned on the `host` and
`metric` columns into four buckets. Unlike the range partitioning example
earlier, this partitioning strategy will spread writes over all tablets in the
table evenly, which helps overall write throughput. Scans over a specific host
and metric can take advantage of partition pruning by specifying equality
predicates, reducing the number of scanned tablets to one. One issue to be
careful of with a pure hash partitioning strategy, is that tablets could grow
indefinitely as more and more data is inserted into the table. Eventually
tablets will become too big for an individual tablet server to hold.
NOTE: Although these examples number the tablets, in reality tablets are only
given UUID identifiers. There is no natural ordering among the tablets in a hash
partitioned table.
[[hash-range-partitioning-example]]
==== Hash and Range Partitioning Example
The previous examples showed how the `metrics` table could be range partitioned
on the `time` column, or hash partitioned on the `host` and `metric` columns.
These strategies have associated strength and weaknesses:
.Partitioning Strategies
|===
| Strategy | Writes | Reads | Tablet Growth
| `range(time)`
| ✗ - all writes go to latest partition
| ✓ - time-bounded scans can be pruned
| ✓ - new tablets can be added for future time periods
| `hash(host, metric)`
| ✓ - writes are spread evenly among tablets
| ✓ - scans on specific hosts and metrics can be pruned
| ✗ - tablets could grow too large
|===
Hash partitioning is good at maximizing write throughput, while range
partitioning avoids issues of unbounded tablet growth. Both strategies can take
advantage of partition pruning to optimize scans in different scenarios. Using
multilevel partitioning, it is possible to combine the two strategies in order
to gain the benefits of both, while minimizing the drawbacks of each.
image::hash-range-partitioning-example.png[Hash and Range Partitioning]
In the example above, range partitioning on the `time` column is combined with
hash partitioning on the `host` and `metric` columns. This strategy can be
thought of as having two dimensions of partitioning: one for the hash level and
one for the range level. Writes into this table at the current time will be
parallelized up to the number of hash buckets, in this case 4. Reads can take
advantage of time bound *and* specific host and metric predicates to prune
partitions. New range partitions can be added, which results in creating 4
additional tablets (as if a new column were added to the diagram).
[[hash-hash-partitioning-example]]
==== Hash and Hash Partitioning Example
Kudu can support any number of hash partitioning levels in the same table, as
long as the levels have no hashed columns in common.
image::hash-hash-partitioning-example.png[Hash and Hash Partitioning]
In the example above, the table is hash partitioned on `host` into 4 buckets,
and hash partitioned on `metric` into 3 buckets, resulting in 12 tablets.
Although writes will tend to be spread among all tablets when using this
strategy, it is slightly more prone to hot-spotting than when hash partitioning
over multiple independent columns, since all values for an individual host or
metric will always belong to a single tablet. Scans can take advantage of
equality predicates on the `host` and `metric` columns separately to prune
partitions.
Multiple levels of hash partitioning can also be combined with range
partitioning, which logically adds another dimension of partitioning.
[[alter-schema]]
== Schema Alterations
You can alter a table's schema in the following ways:
- Rename the table
- Rename primary key columns
- Rename, add, or drop non-primary key columns
- Add and drop range partitions
Multiple alteration steps can be combined in a single transactional operation.
[[known-limitations]]
== Known Limitations
Kudu currently has some known limitations that may factor into schema design.
Number of Columns:: By default, Kudu will not permit the creation of tables with
more than 300 columns. We recommend schema designs that use fewer columns for best
performance.
Size of Cells:: No individual cell may be larger than 64KB before encoding or
compression. The cells making up a composite key are limited to a total of 16KB
after the internal composite-key encoding done by Kudu. Inserting rows not
conforming to these limitations will result in errors being returned to the
client.
Size of Rows:: Although individual cells may be up to 64KB, and Kudu supports up to
300 columns, it is recommended that no single row be larger than a few hundred KB.
Valid Identifiers:: Identifiers such as table and column names must be valid UTF-8
sequences and no longer than 256 bytes.
Immutable Primary Keys:: Kudu does not allow you to update the primary key
columns of a row.
Non-alterable Primary Key:: Kudu does not allow you to alter the primary key
columns after table creation.
Non-alterable Partitioning:: Kudu does not allow you to change how a table is
partitioned after creation, with the exception of adding or dropping range
partitions.
Non-alterable Column Types:: Kudu does not allow the type of a column to be
altered.
Partition Splitting:: Partitions cannot be split or merged after table creation.
Deleted row disk space is not reclaimed:: The disk space occupied by a deleted
row is only reclaimable via compaction, and only when the deletion's age
exceeds the "tablet history maximum age" (controlled by the
`--tablet_history_max_age_sec` flag). Furthermore, Kudu currently only schedules
compactions in order to improve read/write performance; a tablet will never be
compacted purely to reclaim disk space. As such, range partitioning should be
used when it is expected that large swaths of rows will be discarded. With range
partitioning, individual partitions may be dropped to discard data and reclaim
disk space. See link:https://issues.apache.org/jira/browse/KUDU-1625[KUDU-1625]
for details.