blob: 59ddaa75b97ba12f029abadc3a86279597b67153 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup\n",
"\n",
"1. Create a Spark session\n",
"2. Add the iceberg-runtime Jar"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Waiting for a Spark session to start..."
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"\n",
"<ul>\n",
"<li><a href=\"http://hadoop-historyserver:20888/proxy/application_1506444763486_1280232\" target=\"new_tab\">Spark UI</a></li>\n",
"<li><a href=\"http://hadoop-resourcemanager:8088/cluster/app/application_1506444763486_1280232\" target=\"new_tab\">Hadoop app: application_1506444763486_1280232</a></li>\n",
"<li>Local logs are available using %tail_log</li>\n",
"<li>Local logs are at: /data/tmp/jobs/20180110_171610.028692.log</li>\n",
"</ul>\n"
],
"text/plain": [
"\n",
"Spark application_1506444763486_1280232:\n",
"* http://hadoop-historyserver:20888/proxy/application_1506444763486_1280232\n",
"* http://hadoop-resourcemanager:8088/cluster/app/application_1506444763486_1280232\n",
"\n",
"Local logs:\n",
"* /data/tmp/genie/jobs/20180110_171610.028692.log\n",
"* Also available using %tail_log\n"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting download from file:///home/user/iceberg-runtime-0.1.3.jar\n",
"Finished download of iceberg-runtime-0.1.3.jar\n"
]
}
],
"source": [
"%AddJar file:///home/user/iceberg-runtime-0.1.3.jar"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Drop and create a table in HDFS\n",
"\n",
"[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"path = hdfs:/tmp/tables/job_metrics_tmp\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"table {\n",
" 1: event_utc_ms: optional long\n",
" 2: hostname: optional string\n",
" 3: jobflow: optional string\n",
" 4: job_name: optional string\n",
" 5: application_type: optional string\n",
" 6: record_id: optional string\n",
" 7: record_type: optional string\n",
" 8: user: optional string\n",
" 9: submit_time: optional long\n",
" 10: start_time: optional long\n",
" 11: finish_time: optional long\n",
" 12: run_host: optional string\n",
" 13: submit_host: optional string\n",
" 14: status: optional string\n",
" 15: cluster_id: optional string\n",
" 16: cluster_name: optional string\n",
" 17: queue: optional string\n",
" 18: genie_job_name: optional string\n",
" 19: genie_job_id: optional string\n",
" 20: job_uuid: optional string\n",
" 21: counters: optional string\n",
" 22: properties: optional string\n",
" 23: dateint: optional int\n",
" 24: hour: optional int\n",
" 25: batchid: optional string\n",
"}"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import org.apache.hadoop.fs.Path\n",
"import org.apache.iceberg.hadoop.HadoopTables\n",
"import org.apache.iceberg.spark.SparkSchemaUtil\n",
"\n",
"val path = \"hdfs:/tmp/tables/job_metrics_tmp\"\n",
"\n",
"{ // use a block to avoid values (conf, etc.) getting caught in closures\n",
"\n",
" // remove the temp table if it already exists\n",
" val conf = spark.sessionState.newHadoopConf()\n",
" val fs = new Path(path).getFileSystem(conf)\n",
" fs.delete(new Path(path), true /* recursive */ )\n",
"\n",
" // create the temp table using Spark utils to create a schema and partition spec\n",
" val tables = new HadoopTables(conf)\n",
" val schema = SparkSchemaUtil.schemaForTable(spark, \"default.job_metrics\")\n",
" val spec = SparkSchemaUtil.specForTable(spark, \"default.job_metrics\")\n",
"\n",
" tables.create(schema, spec, path)\n",
"\n",
" // show the schema\n",
" tables.load(path).schema\n",
"}\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load table partitions as a DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
"<tr><th>partition</th><th>uri</th><th>format</th></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 0, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 1, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 2, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 3, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 4, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 5, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 6, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 7, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 8, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"<tr><td>{dateint -> 20170316, hour -> 9, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
"</table>"
],
"text/plain": [
"+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+\n",
"| partition | uri | format |\n",
"+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+\n",
"| {dateint -> 20170316, hour -> 0, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 1, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 2, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 3, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 4, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 5, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 6, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 7, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 8, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"| {dateint -> 20170316, hour -> 9, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
"+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"partitions = [partition: map<string,string>, uri: string ... 1 more field]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[partition: map<string,string>, uri: string ... 1 more field]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import org.apache.iceberg.spark.SparkTableUtil\n",
"\n",
"// get a data frame with the table's partitions\n",
"val partitions = SparkTableUtil.partitionDF(spark, \"default.job_metrics\")\n",
" .filter($\"format\".contains(\"parquet\") || $\"format\".contains(\"avro\"))\n",
"\n",
"display(partitions.limit(10))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# List files, load metrics, and append to the table\n",
"\n",
"* [Table API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Table.html)\n",
"* [Append API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/AppendFiles.html)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Stage 3:====================================================> (9 + 1) / 10]"
]
},
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import org.apache.iceberg.hadoop.HadoopTables\n",
"import org.apache.hadoop.conf.Configuration\n",
"\n",
"partitions.repartition(100).flatMap { row =>\n",
"\n",
" // list the partition and read Parquet footers to get metrics\n",
" SparkTableUtil.listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2))\n",
"\n",
"}.repartition(10) // avoid lots of manifests that would be merged later\n",
" .mapPartitions { files =>\n",
"\n",
" // open the table and append the files from this partition\n",
" val tables = new HadoopTables(new Configuration())\n",
" val table = tables.load(\"hdfs:/tmp/tables/job_metrics_tmp\")\n",
"\n",
" // fast appends will create a manifest for the new files\n",
" val append = table.newFastAppend\n",
"\n",
" files.foreach { file =>\n",
" append.appendFile(file.toDataFile(table.spec))\n",
" }\n",
"\n",
" // commit the new files\n",
" append.commit()\n",
"\n",
" Seq.empty[String].iterator\n",
"\n",
"}.count\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Inspect the results\n",
"\n",
"[Snapshot API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Snapshot.html)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"tables = org.apache.iceberg.hadoop.HadoopTables@1782cb95\n",
"table = hdfs:/tmp/tables/job_metrics_tmp\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"BaseSnapshot{id=1515605124481, timestamp_ms=1515605127199, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/695d8ab7-961c-4cef-94d7-367db5d8f7de-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/725154b3-92bd-4d00-9420-34a2866f2876-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/266e6040-d8ff-4713-92cb-0d806c7a3baf-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/3b0e9c88-03b0-4032-bf70-f9af43e00034-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/0747127e-895e-492e-b07e-a54627ee5534-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/db055992-1bf1-4fe7-a851-1eff0a05af55-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/1d5b7cb9-85bd-4088-ad26-a4e9562ad181-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/36db4143-8720-4060-9a8d-d17fa7dcf46f-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/46a079c3-8654-4ed5-9466-088320bda559-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/f239498c-7386-4f31-8421-518105ffbf6a-m0.avro]}"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val tables = new HadoopTables(spark.sessionState.newHadoopConf())\n",
"val table = tables.load(path)\n",
"\n",
"table.currentSnapshot"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"7087"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import scala.collection.JavaConverters._\n",
"\n",
"table.currentSnapshot.addedFiles.asScala.size"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"table.newAppend.commit // use a merge commit to create a single manifest"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"BaseSnapshot{id=1515605215920, timestamp_ms=1515605220253, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/213364b0-d97f-49bf-9126-7273b9784cfb-m0.avro]}"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"table.currentSnapshot"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"hide_input": false,
"kernelspec": {
"display_name": "Spark 2.0.0 - Scala 2.11",
"language": "scala",
"name": "spark2-scala"
},
"language_info": {
"codemirror_mode": "text/x-scala",
"file_extension": ".scala",
"mimetype": "text/x-scala",
"name": "scala",
"pygments_lexer": "scala",
"version": "2.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}