| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| [[developing]] |
| = Developing Applications With Apache Kudu |
| |
| :author: Kudu Team |
| :imagesdir: ./images |
| :icons: font |
| :toc: left |
| :toclevels: 3 |
| :doctype: book |
| :backend: html5 |
| :sectlinks: |
| :experimental: |
| |
| Kudu provides C++, Java and Python client APIs, as well as reference examples to illustrate |
| their use. |
| |
| WARNING: Use of server-side or private interfaces is not supported, and interfaces |
| which are not part of public APIs have no stability guarantees. |
| |
| == Viewing the API Documentation |
| include::installation.adoc[tags=view_api] |
| |
| == Working Examples |
| |
| Several example applications are provided in the |
| link:https://github.com/apache/kudu/tree/master/examples[examples directory] |
| of the Apache Kudu git repository. Each example includes a `README` that shows |
| how to compile and run it. The following list includes some of the |
| examples that are available today. Check the repository itself in case this list goes |
| out of date. |
| |
| `cpp/example.cc`:: |
| A simple C++ application which connects to a Kudu instance, creates a table, writes data to it, then drops the table. |
| `java/java-example`:: |
| A simple Java application which connects to a Kudu instance, creates a table, writes data to it, then drops the table. |
| `java/collectl`:: |
| A small Java application which listens on a TCP socket for time series data corresponding to the Collectl wire protocol. |
| The commonly-available collectl tool can be used to send example data to the server. |
| `java/insert-loadgen`:: |
| A Java application that generates random insert load. |
| `python/dstat-kudu`:: |
| An example program that shows how to use the Kudu Python API to load data into a new / existing Kudu table |
| generated by an external program, `dstat` in this case. |
| `python/graphite-kudu`:: |
| An example plugin for using graphite-web with Kudu as a backend. |
| |
| These examples should serve as helpful starting points for your own Kudu applications and integrations. |
| |
| === Maven Artifacts |
| The following Maven `<dependency>` element is valid for the Apache Kudu public release |
| (since 1.0.0): |
| |
| [source,xml] |
| ---- |
| <dependency> |
| <groupId>org.apache.kudu</groupId> |
| <artifactId>kudu-client</artifactId> |
| <version>1.1.0</version> |
| </dependency> |
| ---- |
| |
| Convenience binary artifacts for the Java client and various Java integrations (e.g. Spark, Flume) |
| are also now available via the link:http://repository.apache.org[ASF Maven repository] and |
| link:https://mvnrepository.com/artifact/org.apache.kudu[Maven Central repository]. |
| |
| == Example Impala Commands With Kudu |
| |
| See link:kudu_impala_integration.html[Using Impala With Kudu] for guidance on installing |
| and using Impala with Kudu, including several `impala-shell` examples. |
| |
| == Kudu Integration with Spark |
| |
| Kudu integrates with Spark through the Data Source API as of version 1.0.0. |
| Include the kudu-spark dependency using the --packages option: |
| |
| Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no |
| longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated |
| with Kudu, version 1.5.0 is the latest to go to. |
| [source] |
| ---- |
| spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0 |
| ---- |
| |
| Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11. Spark 2 artifacts are available |
| up to version 1.7.0. |
| [source] |
| ---- |
| spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.7.0 |
| ---- |
| |
| then import kudu-spark and create a dataframe: |
| [source,scala] |
| ---- |
| import org.apache.kudu.spark.kudu._ |
| import org.apache.kudu.client._ |
| import collection.JavaConverters._ |
| |
| // Read a table from Kudu |
| val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051", |
| "kudu.table" -> "kudu_table")).kudu |
| |
| // Query using the Spark API... |
| df.select("id").filter("id >= 5").show() |
| |
| // ...or register a temporary table and use SQL |
| df.registerTempTable("kudu_table") |
| val filteredDF = spark.sql("select id from kudu_table where id >= 5").show() |
| |
| // Use KuduContext to create, delete, or write to Kudu tables |
| val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext) |
| |
| // Create a new Kudu table from a dataframe schema |
| // NB: No rows from the dataframe are inserted into the table |
| kuduContext.createTable( |
| "test_table", df.schema, Seq("key"), |
| new CreateTableOptions() |
| .setNumReplicas(1) |
| .addHashPartitions(List("key").asJava, 3)) |
| |
| // Insert data |
| kuduContext.insertRows(df, "test_table") |
| |
| // Delete data |
| kuduContext.deleteRows(filteredDF, "test_table") |
| |
| // Upsert data |
| kuduContext.upsertRows(df, "test_table") |
| |
| // Update data |
| val alteredDF = df.select("id", $"count" + 1) |
| kuduContext.updateRows(filteredRows, "test_table") |
| |
| // Data can also be inserted into the Kudu table using the data source, though the methods on |
| // KuduContext are preferred |
| // NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert |
| // in the options map |
| // NB: Only mode Append is supported |
| df.write.options(Map("kudu.master"-> "kudu.master:7051", |
| "kudu.table"-> "test_table")).mode("append").kudu |
| |
| // Check for the existence of a Kudu table |
| kuduContext.tableExists("another_table") |
| |
| // Delete a Kudu table |
| kuduContext.deleteTable("unwanted_table") |
| ---- |
| |
| === Upsert option in Kudu Spark |
| The upsert operation in kudu-spark supports an extra write option of `ignoreNull`. If set to true, |
| it will avoid setting existing column values in Kudu table to Null if the corresponding dataframe |
| column values are Null. If unspecified, `ignoreNull` is false by default. |
| [source,scala] |
| ---- |
| val dataDF = spark.read.options(Map("kudu.master" -> "kudu.master:7051", |
| "kudu.table" -> simpleTableName)).kudu |
| dataDF.registerTempTable(simpleTableName) |
| dataDF.show() |
| // Below is the original data in the table 'simpleTableName' |
| +---+---+ |
| |key|val| |
| +---+---+ |
| | 0|foo| |
| +---+---+ |
| |
| // Upsert a row with existing key 0 and val Null with ignoreNull set to true |
| val nullDF = spark.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val") |
| val wo = new KuduWriteOptions |
| wo.ignoreNull = true |
| kuduContext.upsertRows(nullDF, simpleTableName, wo) |
| dataDF.show() |
| // The val field stays unchanged |
| +---+---+ |
| |key|val| |
| +---+---+ |
| | 0|foo| |
| +---+---+ |
| |
| // Upsert a row with existing key 0 and val Null with ignoreNull default/set to false |
| kuduContext.upsertRows(nullDF, simpleTableName) |
| // Equivalent to: |
| // val wo = new KuduWriteOptions |
| // wo.ignoreNull = false |
| // kuduContext.upsertRows(nullDF, simpleTableName, wo) |
| df.show() |
| // The val field is set to Null this time |
| +---+----+ |
| |key| val| |
| +---+----+ |
| | 0|null| |
| +---+----+ |
| ---- |
| |
| === Using Spark with a Secure Kudu Cluster |
| |
| The Kudu Spark integration is able to operate on secure Kudu clusters which have |
| authentication and encryption enabled, but the submitter of the Spark job must |
| provide the proper credentials. For Spark jobs using the default 'client' deploy |
| mode, the submitting user must have an active Kerberos ticket granted through |
| `kinit`. For Spark jobs using the 'cluster' deploy mode, a Kerberos principal |
| name and keytab location must be provided through the `--principal` and |
| `--keytab` arguments to `spark2-submit`. |
| |
| === Spark Integration Best Practices |
| |
| ==== Avoid multiple Kudu clients per cluster. |
| |
| One common Kudu-Spark coding error is instantiating extra `KuduClient` objects. |
| In kudu-spark, a `KuduClient` is owned by the `KuduContext`. Spark application code |
| should not create another `KuduClient` connecting to the same cluster. Instead, |
| application code should use the `KuduContext` to access a `KuduClient` using |
| `KuduContext#syncClient`. |
| |
| To diagnose multiple `KuduClient` instances in a Spark job, look for signs in |
| the logs of the master being overloaded by many `GetTableLocations` or |
| `GetTabletLocations` requests coming from different clients, usually around the |
| same time. This symptom is especially likely in Spark Streaming code, |
| where creating a `KuduClient` per task will result in periodic waves of master |
| requests from new clients. |
| |
| === Spark Integration Known Issues and Limitations |
| |
| - Spark 2.2+ requires Java 8 at runtime even though Kudu Spark 2.x integration |
| is Java 7 compatible. Spark 2.2 is the default dependency version as of |
| Kudu 1.5.0. |
| - Kudu tables with a name containing upper case or non-ascii characters must be |
| assigned an alternate name when registered as a temporary table. |
| - Kudu tables with a column name containing upper case or non-ascii characters |
| may not be used with SparkSQL. Columns may be renamed in Kudu to work around |
| this issue. |
| - `<>` and `OR` predicates are not pushed to Kudu, and instead will be evaluated |
| by the Spark task. Only `LIKE` predicates with a suffix wildcard are pushed to |
| Kudu, meaning that `LIKE "FOO%"` is pushed down but `LIKE "FOO%BAR"` isn't. |
| - Kudu does not support every type supported by Spark SQL. For example, |
| `Date` and complex types are not supported. |
| - Kudu tables may only be registered as temporary tables in SparkSQL. |
| Kudu tables may not be queried using HiveContext. |
| |
| |
| == Kudu Python Client |
| The Kudu Python client provides a Python friendly interface to the C++ client API. |
| The sample below demonstrates the use of part of the Python client. |
| [source,python] |
| ---- |
| import kudu |
| from kudu.client import Partitioning |
| from datetime import datetime |
| |
| # Connect to Kudu master server |
| client = kudu.connect(host='kudu.master', port=7051) |
| |
| # Define a schema for a new table |
| builder = kudu.schema_builder() |
| builder.add_column('key').type(kudu.int64).nullable(False).primary_key() |
| builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4') |
| schema = builder.build() |
| |
| # Define partitioning schema |
| partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) |
| |
| # Create new table |
| client.create_table('python-example', schema, partitioning) |
| |
| # Open a table |
| table = client.table('python-example') |
| |
| # Create a new session so that we can apply write operations |
| session = client.new_session() |
| |
| # Insert a row |
| op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()}) |
| session.apply(op) |
| |
| # Upsert a row |
| op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"}) |
| session.apply(op) |
| |
| # Updating a row |
| op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")}) |
| session.apply(op) |
| |
| # Delete a row |
| op = table.new_delete({'key': 2}) |
| session.apply(op) |
| |
| # Flush write operations, if failures occur, capture print them. |
| try: |
| session.flush() |
| except kudu.KuduBadStatus as e: |
| print(session.get_pending_errors()) |
| |
| # Create a scanner and add a predicate |
| scanner = table.scanner() |
| scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1)) |
| |
| # Open Scanner and read all tuples |
| # Note: This doesn't scale for large scans |
| result = scanner.open().read_all_tuples() |
| ---- |
| |
| == Integration with MapReduce, YARN, and Other Frameworks |
| |
| Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in |
| the Hadoop ecosystem. See |
| link:https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/RowCounter.java[RowCounter.java] |
| and |
| link:https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportCsv.java[ImportCsv.java] |
| for examples which you can model your own integrations on. Stay tuned for more examples |
| using YARN and Spark in the future. |
| |