| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Setup\n", |
| "\n", |
| "1. Create a Spark session\n", |
| "2. Add the iceberg-runtime Jar" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 1, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "Waiting for a Spark session to start..." |
| ] |
| }, |
| "metadata": {}, |
| "output_type": "display_data" |
| }, |
| { |
| "data": { |
| "text/html": [ |
| "\n", |
| "<ul>\n", |
| "<li><a href=\"http://hadoop-historyserver:20888/proxy/application_1506444763486_1280232\" target=\"new_tab\">Spark UI</a></li>\n", |
| "<li><a href=\"http://hadoop-resourcemanager:8088/cluster/app/application_1506444763486_1280232\" target=\"new_tab\">Hadoop app: application_1506444763486_1280232</a></li>\n", |
| "<li>Local logs are available using %tail_log</li>\n", |
| "<li>Local logs are at: /data/tmp/jobs/20180110_171610.028692.log</li>\n", |
| "</ul>\n" |
| ], |
| "text/plain": [ |
| "\n", |
| "Spark application_1506444763486_1280232:\n", |
| "* http://hadoop-historyserver:20888/proxy/application_1506444763486_1280232\n", |
| "* http://hadoop-resourcemanager:8088/cluster/app/application_1506444763486_1280232\n", |
| "\n", |
| "Local logs:\n", |
| "* /data/tmp/genie/jobs/20180110_171610.028692.log\n", |
| "* Also available using %tail_log\n" |
| ] |
| }, |
| "execution_count": 1, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "spark" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 2, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "Starting download from file:///home/user/iceberg-runtime-0.1.3.jar\n", |
| "Finished download of iceberg-runtime-0.1.3.jar\n" |
| ] |
| } |
| ], |
| "source": [ |
| "%AddJar file:///home/user/iceberg-runtime-0.1.3.jar" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Drop and create a table in HDFS\n", |
| "\n", |
| "[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 3, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "path = hdfs:/tmp/tables/job_metrics_tmp\n" |
| ] |
| }, |
| "metadata": {}, |
| "output_type": "display_data" |
| }, |
| { |
| "data": { |
| "text/plain": [ |
| "table {\n", |
| " 1: event_utc_ms: optional long\n", |
| " 2: hostname: optional string\n", |
| " 3: jobflow: optional string\n", |
| " 4: job_name: optional string\n", |
| " 5: application_type: optional string\n", |
| " 6: record_id: optional string\n", |
| " 7: record_type: optional string\n", |
| " 8: user: optional string\n", |
| " 9: submit_time: optional long\n", |
| " 10: start_time: optional long\n", |
| " 11: finish_time: optional long\n", |
| " 12: run_host: optional string\n", |
| " 13: submit_host: optional string\n", |
| " 14: status: optional string\n", |
| " 15: cluster_id: optional string\n", |
| " 16: cluster_name: optional string\n", |
| " 17: queue: optional string\n", |
| " 18: genie_job_name: optional string\n", |
| " 19: genie_job_id: optional string\n", |
| " 20: job_uuid: optional string\n", |
| " 21: counters: optional string\n", |
| " 22: properties: optional string\n", |
| " 23: dateint: optional int\n", |
| " 24: hour: optional int\n", |
| " 25: batchid: optional string\n", |
| "}" |
| ] |
| }, |
| "execution_count": 3, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "import org.apache.hadoop.fs.Path\n", |
| "import org.apache.iceberg.hadoop.HadoopTables\n", |
| "import org.apache.iceberg.spark.SparkSchemaUtil\n", |
| "\n", |
| "val path = \"hdfs:/tmp/tables/job_metrics_tmp\"\n", |
| "\n", |
| "{ // use a block to avoid values (conf, etc.) getting caught in closures\n", |
| "\n", |
| " // remove the temp table if it already exists\n", |
| " val conf = spark.sessionState.newHadoopConf()\n", |
| " val fs = new Path(path).getFileSystem(conf)\n", |
| " fs.delete(new Path(path), true /* recursive */ )\n", |
| "\n", |
| " // create the temp table using Spark utils to create a schema and partition spec\n", |
| " val tables = new HadoopTables(conf)\n", |
| " val schema = SparkSchemaUtil.schemaForTable(spark, \"default.job_metrics\")\n", |
| " val spec = SparkSchemaUtil.specForTable(spark, \"default.job_metrics\")\n", |
| "\n", |
| " tables.create(schema, spec, path)\n", |
| "\n", |
| " // show the schema\n", |
| " tables.load(path).schema\n", |
| "}\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Load table partitions as a DataFrame" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/html": [ |
| "<table>\n", |
| "<tr><th>partition</th><th>uri</th><th>format</th></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 0, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 1, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 2, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 3, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 4, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 5, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 6, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 7, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 8, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "<tr><td>{dateint -> 20170316, hour -> 9, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n", |
| "</table>" |
| ], |
| "text/plain": [ |
| "+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+\n", |
| "| partition | uri | format |\n", |
| "+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+\n", |
| "| {dateint -> 20170316, hour -> 0, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 1, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 2, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 3, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 4, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 5, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 6, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 7, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 8, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "| {dateint -> 20170316, hour -> 9, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n", |
| "+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+" |
| ] |
| }, |
| "metadata": {}, |
| "output_type": "display_data" |
| }, |
| { |
| "data": { |
| "text/plain": [ |
| "partitions = [partition: map<string,string>, uri: string ... 1 more field]\n" |
| ] |
| }, |
| "metadata": {}, |
| "output_type": "display_data" |
| }, |
| { |
| "data": { |
| "text/plain": [ |
| "[partition: map<string,string>, uri: string ... 1 more field]" |
| ] |
| }, |
| "execution_count": 4, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "import org.apache.iceberg.spark.SparkTableUtil\n", |
| "\n", |
| "// get a data frame with the table's partitions\n", |
| "val partitions = SparkTableUtil.partitionDF(spark, \"default.job_metrics\")\n", |
| " .filter($\"format\".contains(\"parquet\") || $\"format\".contains(\"avro\"))\n", |
| "\n", |
| "display(partitions.limit(10))" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# List files, load metrics, and append to the table\n", |
| "\n", |
| "* [Table API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Table.html)\n", |
| "* [Append API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/AppendFiles.html)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 5, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "[Stage 3:====================================================> (9 + 1) / 10]" |
| ] |
| }, |
| { |
| "data": { |
| "text/plain": [ |
| "0" |
| ] |
| }, |
| "execution_count": 5, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "import org.apache.iceberg.hadoop.HadoopTables\n", |
| "import org.apache.hadoop.conf.Configuration\n", |
| "\n", |
| "partitions.repartition(100).flatMap { row =>\n", |
| "\n", |
| " // list the partition and read Parquet footers to get metrics\n", |
| " SparkTableUtil.listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2))\n", |
| "\n", |
| "}.repartition(10) // avoid lots of manifests that would be merged later\n", |
| " .mapPartitions { files =>\n", |
| "\n", |
| " // open the table and append the files from this partition\n", |
| " val tables = new HadoopTables(new Configuration())\n", |
| " val table = tables.load(\"hdfs:/tmp/tables/job_metrics_tmp\")\n", |
| "\n", |
| " // fast appends will create a manifest for the new files\n", |
| " val append = table.newFastAppend\n", |
| "\n", |
| " files.foreach { file =>\n", |
| " append.appendFile(file.toDataFile(table.spec))\n", |
| " }\n", |
| "\n", |
| " // commit the new files\n", |
| " append.commit()\n", |
| "\n", |
| " Seq.empty[String].iterator\n", |
| "\n", |
| "}.count\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Inspect the results\n", |
| "\n", |
| "[Snapshot API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Snapshot.html)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "tables = org.apache.iceberg.hadoop.HadoopTables@1782cb95\n", |
| "table = hdfs:/tmp/tables/job_metrics_tmp\n" |
| ] |
| }, |
| "metadata": {}, |
| "output_type": "display_data" |
| }, |
| { |
| "data": { |
| "text/plain": [ |
| "BaseSnapshot{id=1515605124481, timestamp_ms=1515605127199, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/695d8ab7-961c-4cef-94d7-367db5d8f7de-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/725154b3-92bd-4d00-9420-34a2866f2876-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/266e6040-d8ff-4713-92cb-0d806c7a3baf-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/3b0e9c88-03b0-4032-bf70-f9af43e00034-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/0747127e-895e-492e-b07e-a54627ee5534-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/db055992-1bf1-4fe7-a851-1eff0a05af55-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/1d5b7cb9-85bd-4088-ad26-a4e9562ad181-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/36db4143-8720-4060-9a8d-d17fa7dcf46f-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/46a079c3-8654-4ed5-9466-088320bda559-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/f239498c-7386-4f31-8421-518105ffbf6a-m0.avro]}" |
| ] |
| }, |
| "execution_count": 6, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "val tables = new HadoopTables(spark.sessionState.newHadoopConf())\n", |
| "val table = tables.load(path)\n", |
| "\n", |
| "table.currentSnapshot" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 7, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "7087" |
| ] |
| }, |
| "execution_count": 7, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "import scala.collection.JavaConverters._\n", |
| "\n", |
| "table.currentSnapshot.addedFiles.asScala.size" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 8, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "table.newAppend.commit // use a merge commit to create a single manifest" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "data": { |
| "text/plain": [ |
| "BaseSnapshot{id=1515605215920, timestamp_ms=1515605220253, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/213364b0-d97f-49bf-9126-7273b9784cfb-m0.avro]}" |
| ] |
| }, |
| "execution_count": 9, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "table.currentSnapshot" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [] |
| } |
| ], |
| "metadata": { |
| "hide_input": false, |
| "kernelspec": { |
| "display_name": "Spark 2.0.0 - Scala 2.11", |
| "language": "scala", |
| "name": "spark2-scala" |
| }, |
| "language_info": { |
| "codemirror_mode": "text/x-scala", |
| "file_extension": ".scala", |
| "mimetype": "text/x-scala", |
| "name": "scala", |
| "pygments_lexer": "scala", |
| "version": "2.11.8" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 2 |
| } |