Welcome! :smile:
If you‘ve stumbled across this module, hopefully you’re looking for some guidance on how to get started with the Apache Iceberg table format. This set of classes collects code examples of how to use the Iceberg Java API with Spark, along with some extra detail here in the README.
The examples are structured as JUnit tests that you can download and run locally if you want to mess around with Iceberg yourself.
If you'd like to try out Iceberg in your own project using Spark, you can use the iceberg-spark-runtime
dependency:
<dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime</artifactId> <version>${iceberg.version}</version> </dependency>
You'll also need spark-sql
:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.4</version> </dependency>
To add a dependency on Iceberg in Gradle, add the following to build.gradle
:
dependencies { compile 'org.apache.iceberg:iceberg-core:0.8.0-incubating' }
The following section will break down the different areas of Iceberg explored in the examples, with links to the code and extra information that could be useful for new users.
There are multiple ways of creating tables with Iceberg, including using the Hive Metastore to keep track of tables (HiveCatalog), or using HDFS / your local file system (HadoopTables) to store the tables. However, it should be noted that directory tables (such as those using HadoopTables
) don’t support all catalog operations, like rename and therefore use the Tables
interface instead of the Catalog
interface. It should be noted that HadoopTables
shouldn’t be used with file systems that do not support atomic rename as Iceberg depends on this to synchronize concurrent commits. To limit complexity, these examples create tables on your local file system using the HadoopTables
class.
To create an Iceberg Table
you will need to use the Iceberg API to create a Schema
and PartitionSpec
which you use with a Spark DataFrameWriter
.
Code examples can be found here.
It could be interesting to note that when writing partitioned data, Iceberg will layout your files in a similar manner to Hive:
├── data │ ├── published_month=2017-09 │ │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00002.parquet │ ├── published_month=2018-09 │ │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00001.parquet │ ├── published_month=2018-11 │ │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00000.parquet │ └── published_month=null │ └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00003.parquet └── metadata └── version-hint.text
WARNING It should be noted that it is not possible to just drag-and-drop data files into an Iceberg table like the one shown above and expect to see your data in the table. Each file is tracked individually and is managed by Iceberg, and so must be written into the table using the Iceberg API.
Reading Iceberg tables is fairly simple using the Spark DataFrameReader
.
Code examples can be found here.
This section looks a little bit closer at the metadata produced by Iceberg tables. Consider an example where you've written some data to a table. Your files will look something like this:
├── data │ └── ... └── metadata ├── 51accd1d-39c7-4a6e-8f35-9e05f7c67864-m0.avro ├── snap-1335014336004891572-1-51accd1d-39c7-4a6e-8f35-9e05f7c67864.avro ├── v1.metadata.json ├── v2.metadata.json └── version-hint.text
The metadata for your table is kept in json files and each commit to a table will produce a new metadata file. For tables using a metastore for the metadata, the file used is whichever file the metastore points at. For HadoopTables
, the file used will be the latest version available. Look here for more information on metadata.
The metadata file will contain things like the table location, the schema and the partition spec:
{ "format-version" : 1, "table-uuid" : "f31aa6d7-acc3-4365-b737-4ef028a60bc1", "location" : "/var/folders/sg/ypkyhl2s0p18qcd10ddpkn0c0000gn/T/temp5216691795982307214", "last-updated-ms" : 1572972868185, "last-column-id" : 2, "schema" : { "type" : "struct", "fields" : [ { ... } ] }, "partition-spec" : [ { ... } ], "default-spec-id" : 0, "partition-specs" : [ { ... } ] } ], "properties" : { }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ] }
When you then add your first chunk of data, you get a new version of the metadata (v2.metadata.json
) that is the same as the first version except for the snapshot section at the bottom, which gets updated to:
"current-snapshot-id" : 8405273199394950821, "snapshots" : [ { "snapshot-id" : 8405273199394950821, "timestamp-ms" : 1572972873293, "summary" : { "operation" : "append", "spark.app.id" : "local-1572972867758", "added-data-files" : "4", "added-records" : "4", "changed-partition-count" : "4", "total-records" : "4", "total-data-files" : "4" }, "manifest-list" : "/var/folders/sg/ypkyhl2s0p18qcd10ddpkn0c0000gn/T/temp5216691795982307214/metadata/snap-8405273199394950821-1-5706fc75-31e1-404e-aa23-b493387e2e32.avro" } ], "snapshot-log" : [ { "timestamp-ms" : 1572972873293, "snapshot-id" : 8405273199394950821 } ]
Here you get information on the data you have just written to the table, such as added-records
and added-data-files
as well as where the manifest list is located.
Iceberg uses snapshots as part of its implementation, and provides a lot of useful functionality from this, such as time travel.
ExpireSnapshots
API. Currently, this must be called by the user.SnapshotID
values.SnapshotID
or a timestamp value (time travel).Code examples can be found here.
Iceberg provides support to handle schema evolution of your tables over time:
null
value for this new column. You cannot use an alternative default value.optional
column, meaning adding data to this column isn't enforced for each future write.int
-> long
). For a definitive list, see the official documentation.Code examples can be found here.
Optimistic concurrency is when a system assumes that multiple writers can write to the same table without interfering with each other. This is usually used in environments where there is low data contention. It means that locking of the table isn't used, allowing multiple writers to write to the table at the same time.
However, this means you need to occasionally deal with concurrent writer conflicts. This is when multiple writers start writing to a table at the same time, but one finishes first and commits an update. Then when the second writer tries to commit it has to throw an error because the table isn't in the same state as it was when it started writing.
Iceberg deals with this by attempting retries of the write based on the new metadata. This can happen if the files the first write changed aren‘t touched by the second write, then it’s deemed safe to commit the second update.
This test looks to experiment with how optimistic concurrency works. For more information on conflict resolution, look here and for information on write concurrency, look here.
By default, Iceberg has set the commit.retry.num-retries
property to 4. You can edit this default by creating an UpdateProperties
object and assigning a new number to that property:
table.updateProperties().set("commit.retry.num-retries", "1").commit();
You can find more information on other table properties you can configure here.