| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| = Developing Applications With Apache Kudu |
| |
| :author: Kudu Team |
| :imagesdir: ./images |
| :icons: font |
| :toc: left |
| :toclevels: 3 |
| :doctype: book |
| :backend: html5 |
| :sectlinks: |
| :experimental: |
| |
| Kudu provides C++, Java and Python client APIs, as well as reference examples to illustrate |
| their use. |
| |
| WARNING: Use of server-side or private interfaces is not supported, and interfaces |
| which are not part of public APIs have no stability guarantees. |
| |
| [[view_api]] |
| == Viewing the API Documentation |
| |
| .C++ API Documentation |
| You can view the link:../cpp-client-api/index.html[C++ client API documentation] |
| online. Alternatively, after |
| <<installation.adoc#build_from_source,building Kudu from source>>, you can |
| additionally build the `doxygen` target (e.g., run `make doxygen` if using |
| make) and use the locally generated API documentation by opening |
| `docs/doxygen/client_api/html/index.html` file in your favorite Web browser. |
| |
| NOTE: In order to build the `doxygen` target, it's necessary to have |
| doxygen of version 1.8.11 or newer with Dot (graphviz) support installed at |
| your build machine. If you installed doxygen after building Kudu from source, |
| you will need to run `cmake` again to pick up the doxygen location and generate |
| appropriate targets. |
| |
| .Java API Documentation |
| You can view the link:../apidocs/index.html[Java API documentation] online. |
| Alternatively, after <<installation.adoc#build_java_client,building |
| the Java client>>, Java API documentation is available in |
| `java/kudu-client/target/apidocs/index.html`. |
| |
| == Working Examples |
| |
| Several example applications are provided in the |
| link:https://github.com/apache/kudu/tree/master/examples[examples directory] |
| of the Apache Kudu git repository. Each example includes a `README` that shows |
| how to compile and run it. The following list includes some of the |
| examples that are available today. Check the repository itself in case this list goes |
| out of date. |
| |
| `cpp/example.cc`:: |
| A simple C++ application which connects to a Kudu instance, creates a table, writes data to it, then drops the table. |
| `java/java-example`:: |
| A simple Java application which connects to a Kudu instance, creates a table, writes data to it, then drops the table. |
| `java/collectl`:: |
| A small Java application which listens on a TCP socket for time series data corresponding to the Collectl wire protocol. |
| The commonly-available collectl tool can be used to send example data to the server. |
| `java/insert-loadgen`:: |
| A Java application that generates random insert load. |
| `python/dstat-kudu`:: |
| An example program that shows how to use the Kudu Python API to load data into a new / existing Kudu table |
| generated by an external program, `dstat` in this case. |
| `python/graphite-kudu`:: |
| An example plugin for using graphite-web with Kudu as a backend. |
| |
| These examples should serve as helpful starting points for your own Kudu applications and integrations. |
| |
| == Maven Artifacts |
| The following Maven `<dependency>` element is valid for the Apache Kudu public release |
| (since 1.0.0): |
| |
| [source,xml] |
| ---- |
| <dependency> |
| <groupId>org.apache.kudu</groupId> |
| <artifactId>kudu-client</artifactId> |
| <version>1.14.0</version> |
| </dependency> |
| ---- |
| |
| Convenience binary artifacts for the Java client and various Java integrations (e.g. Spark, Flume) |
| are also available via the link:http://repository.apache.org[ASF Maven repository] and |
| link:https://mvnrepository.com/artifact/org.apache.kudu[Maven Central repository]. |
| |
| == Example Impala Commands With Kudu |
| |
| See link:kudu_impala_integration.html[Using Impala With Kudu] for guidance on installing |
| and using Impala with Kudu, including several `impala-shell` examples. |
| |
| == Kudu Integration with Spark |
| |
| Kudu integrates with Spark through the Data Source API as of version 1.0.0. |
| Include the kudu-spark dependency using the --packages option: |
| |
| Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no |
| longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated |
| with Kudu, version 1.5.0 is the latest to go to. |
| [source] |
| ---- |
| spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0 |
| ---- |
| |
| Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11. |
| |
| NOTE: kudu-spark versions 1.8.0 and below have slightly different syntax. |
| See the documentation of your version for a valid example. Versioned documentation can be found |
| on the link:http://kudu.apache.org/releases/[releases page]. |
| |
| [source] |
| ---- |
| spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.14.0 |
| ---- |
| |
| Below is a minimal Spark SQL "select" example. We first import the kudu spark package, |
| then create a DataFrame, and then create a view from the DataFrame. After those |
| steps, the table is accessible from Spark SQL. |
| |
| NOTE: There is also a Spark link:https://github.com/apache/kudu/tree/master/examples/quickstart/spark[quickstart] |
| guide and another link:https://github.com/apache/kudu/tree/master/examples/scala/spark-example[example] |
| available. |
| |
| NOTE: You can use the Kudu CLI tool to create table and generate data by |
| `kudu perf loadgen kudu.master:7051 -keep_auto_table` for the following two examples. |
| |
| [source,scala] |
| ---- |
| import org.apache.kudu.spark.kudu._ |
| |
| // Create a DataFrame that points to the Kudu table we want to query. |
| val df = spark.read.options(Map("kudu.master" -> "kudu.master:7051", |
| "kudu.table" -> "default.my_table")).format("kudu").load |
| // Create a view from the DataFrame to make it accessible from Spark SQL. |
| df.createOrReplaceTempView("my_table") |
| // Now we can run Spark SQL queries against our view of the Kudu table. |
| spark.sql("select * from my_table").show() |
| ---- |
| |
| Below is a more sophisticated example that includes both reads and writes: |
| |
| [source,scala] |
| ---- |
| import org.apache.kudu.client._ |
| import org.apache.kudu.spark.kudu.KuduContext |
| import collection.JavaConverters._ |
| |
| // Read a table from Kudu |
| val df = spark.read |
| .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table")) |
| .format("kudu").load |
| |
| // Query using the Spark API... |
| df.select("key").filter("key >= 5").show() |
| |
| // ...or register a temporary table and use SQL |
| df.createOrReplaceTempView("kudu_table") |
| val filteredDF = spark.sql("select key from kudu_table where key >= 5").show() |
| |
| // Use KuduContext to create, delete, or write to Kudu tables |
| val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext) |
| |
| // Create a new Kudu table from a DataFrame schema |
| // NB: No rows from the DataFrame are inserted into the table |
| kuduContext.createTable( |
| "test_table", df.schema, Seq("key"), |
| new CreateTableOptions() |
| .setNumReplicas(1) |
| .addHashPartitions(List("key").asJava, 3)) |
| |
| // Check for the existence of a Kudu table |
| kuduContext.tableExists("test_table") |
| |
| // Insert data |
| kuduContext.insertRows(df, "test_table") |
| |
| // Delete data |
| kuduContext.deleteRows(df, "test_table") |
| |
| // Upsert data |
| kuduContext.upsertRows(df, "test_table") |
| |
| // Update data |
| val updateDF = df.select($"key", ($"int_val" + 1).as("int_val")) |
| kuduContext.updateRows(updateDF, "test_table") |
| |
| // Data can also be inserted into the Kudu table using the data source, though the methods on |
| // KuduContext are preferred |
| // NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert |
| // in the options map |
| // NB: Only mode Append is supported |
| df.write |
| .options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")) |
| .mode("append") |
| .format("kudu").save |
| |
| // Delete a Kudu table |
| kuduContext.deleteTable("test_table") |
| ---- |
| |
| === Upsert option in Kudu Spark |
| The upsert operation in kudu-spark supports an extra write option of `ignoreNull`. If set to true, |
| it will avoid setting existing column values in Kudu table to Null if the corresponding DataFrame |
| column values are Null. If unspecified, `ignoreNull` is false by default. |
| [source,scala] |
| ---- |
| val dataFrame = spark.read |
| .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> simpleTableName)) |
| .format("kudu").load |
| dataFrame.createOrReplaceTempView(simpleTableName) |
| dataFrame.show() |
| // Below is the original data in the table 'simpleTableName' |
| +---+---+ |
| |key|val| |
| +---+---+ |
| | 0|foo| |
| +---+---+ |
| |
| // Upsert a row with existing key 0 and val Null with ignoreNull set to true |
| val nullDF = spark.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val") |
| val wo = new KuduWriteOptions |
| wo.ignoreNull = true |
| kuduContext.upsertRows(nullDF, simpleTableName, wo) |
| dataFrame.show() |
| // The val field stays unchanged |
| +---+---+ |
| |key|val| |
| +---+---+ |
| | 0|foo| |
| +---+---+ |
| |
| // Upsert a row with existing key 0 and val Null with ignoreNull default/set to false |
| kuduContext.upsertRows(nullDF, simpleTableName) |
| // Equivalent to: |
| // val wo = new KuduWriteOptions |
| // wo.ignoreNull = false |
| // kuduContext.upsertRows(nullDF, simpleTableName, wo) |
| df.show() |
| // The val field is set to Null this time |
| +---+----+ |
| |key| val| |
| +---+----+ |
| | 0|null| |
| +---+----+ |
| ---- |
| |
| === Using Spark with a Secure Kudu Cluster |
| |
| The Kudu Spark integration is able to operate on secure Kudu clusters which have |
| authentication and encryption enabled, but the submitter of the Spark job must |
| provide the proper credentials. For Spark jobs using the default 'client' deploy |
| mode, the submitting user must have an active Kerberos ticket granted through |
| `kinit`. For Spark jobs using the 'cluster' deploy mode, a Kerberos principal |
| name and keytab location must be provided through the `--principal` and |
| `--keytab` arguments to `spark2-submit`. |
| |
| === Spark Integration Best Practices |
| |
| ==== Avoid multiple Kudu clients per cluster. |
| |
| One common Kudu-Spark coding error is instantiating extra `KuduClient` objects. |
| In kudu-spark, a `KuduClient` is owned by the `KuduContext`. Spark application code |
| should not create another `KuduClient` connecting to the same cluster. Instead, |
| application code should use the `KuduContext` to access a `KuduClient` using |
| `KuduContext#syncClient`. |
| |
| To diagnose multiple `KuduClient` instances in a Spark job, look for signs in |
| the logs of the master being overloaded by many `GetTableLocations` or |
| `GetTabletLocations` requests coming from different clients, usually around the |
| same time. This symptom is especially likely in Spark Streaming code, |
| where creating a `KuduClient` per task will result in periodic waves of master |
| requests from new clients. |
| |
| === Spark Integration Known Issues and Limitations |
| |
| - Spark 2.2+ requires Java 8 at runtime even though Kudu Spark 2.x integration |
| is Java 7 compatible. Spark 2.2 is the default dependency version as of |
| Kudu 1.5.0. |
| - Kudu tables with a name containing upper case or non-ascii characters must be |
| assigned an alternate name when registered as a temporary table. |
| - Kudu tables with a column name containing upper case or non-ascii characters |
| may not be used with SparkSQL. Columns may be renamed in Kudu to work around |
| this issue. |
| - `<>` and `OR` predicates are not pushed to Kudu, and instead will be evaluated |
| by the Spark task. Only `LIKE` predicates with a suffix wildcard are pushed to |
| Kudu, meaning that `LIKE "FOO%"` is pushed down but `LIKE "FOO%BAR"` isn't. |
| - Kudu does not support every type supported by Spark SQL. For example, |
| `Date` and complex types are not supported. |
| - Kudu tables may only be registered as temporary tables in SparkSQL. |
| Kudu tables may not be queried using HiveContext. |
| |
| == JVM-Based Integration Testing |
| |
| As of version 1.9.0, Kudu ships with an experimental feature called the binary |
| test JAR. This feature gives people who want to test against Kudu the |
| capability to start a Kudu "mini cluster" from Java or another JVM-based |
| language without having to first build Kudu locally. This is possible because |
| the Kudu binary JAR contains relocatable Kudu binaries that are used by the |
| `KuduTestHarness` in the `kudu-test-utils` module. The `KuduTestHarness` |
| contains logic to search the classpath for the Kudu binaries and to start a |
| mini cluster using them. |
| |
| _Important: The `kudu-binary` module should only be used to run Kudu for |
| integration testing purposes. It should never be used to run an actual Kudu |
| service, in production or development, because the `kudu-binary` module |
| includes native security-related dependencies that have been copied from the |
| build system and will not be patched when the operating system on the runtime |
| host is patched._ |
| |
| === System Requirements |
| |
| The binary test JAR must be run on one of the |
| <<installation.adoc#prerequisites_and_requirements,supported Kudu platforms>>, |
| which includes: |
| |
| - macOS El Capitan (10.11) or later; |
| - CentOS 6.6+, Ubuntu 14.04+, or another recent distribution of Linux |
| |
| The related Maven integration using `os-maven-plugin` requires Maven 3.1 or later. |
| |
| === Using the Kudu Binary Test Jar |
| |
| Take the following steps to start a Kudu mini cluster from a Java project. |
| |
| **1. Add build-time dependencies.** The `kudu-binary` artifact contains the |
| native Kudu (server and command-line tool) binaries for specific operating |
| systems. In order to download the right artifact for the running operating |
| system, use the `os-maven-plugin` to detect the current runtime environment. |
| Finally, the `kudu-test-utils` module provides the `KuduTestHarness` class, |
| which runs a Kudu mini cluster. |
| |
| Maven example for Kudu 1.14.0: |
| |
| [source,xml] |
| ---- |
| <build> |
| <extensions> |
| <!-- Used to find the right kudu-binary artifact with the Maven |
| property ${os.detected.classifier} --> |
| <extension> |
| <groupId>kr.motd.maven</groupId> |
| <artifactId>os-maven-plugin</artifactId> |
| <version>1.6.2</version> |
| </extension> |
| </extensions> |
| </build> |
| |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.kudu</groupId> |
| <artifactId>kudu-test-utils</artifactId> |
| <version>1.14.0</version> |
| <scope>test</scope> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.kudu</groupId> |
| <artifactId>kudu-binary</artifactId> |
| <version>1.14.0</version> |
| <classifier>${os.detected.classifier}</classifier> |
| <scope>test</scope> |
| </dependency> |
| </dependencies> |
| ---- |
| |
| **2. Write a test that starts a Kudu mini cluster using the KuduTestHarness.** |
| It will automatically find the binary test JAR if Maven is configured correctly. |
| |
| The recommended way to start a Kudu mini cluster is by using the |
| `KuduTestHarness` class from the `kudu-test-utils` module, which also acts as a |
| JUnit `Rule`. Here is an example of a Java-based integration test that starts a |
| Kudu cluster, creates a Kudu table on the cluster, and then exits: |
| |
| [source,java] |
| ---- |
| import org.apache.kudu.ColumnSchema; |
| import org.apache.kudu.Schema; |
| import org.apache.kudu.Type; |
| import org.apache.kudu.test.KuduTestHarness; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| public class MyKuduTest { |
| // The KuduTestHarness automatically starts and stops a real Kudu cluster |
| // when each test is run. Kudu persists its on-disk state in a temporary |
| // directory under a location defined by the environment variable TEST_TMPDIR |
| // if set, or under /tmp otherwise. That cluster data is deleted on |
| // successful exit of the test. The cluster output is logged through slf4j. |
| @Rule |
| public KuduTestHarness harness = new KuduTestHarness(); |
| |
| @Test |
| public void test() throws Exception { |
| // Get a KuduClient configured to talk to the running mini cluster. |
| KuduClient client = harness.getClient(); |
| |
| // Create a new Kudu table. |
| List<ColumnSchema> columns = new ArrayList<>(); |
| columns.add( |
| new ColumnSchema.ColumnSchemaBuilder( |
| "key", Type.INT32).key(true).build()); |
| Schema schema = new Schema(columns); |
| CreateTableOptions opts = |
| new CreateTableOptions().setRangePartitionColumns( |
| Collections.singletonList("key")); |
| client.createTable("table-1", schema, opts); |
| |
| // Now we may insert rows into the newly-created Kudu table using 'client', |
| // scan the table, etc. |
| } |
| } |
| ---- |
| |
| For more examples of using the `KuduTestHarness`, including how to pass |
| configuration options to the Kudu cluster being managed by the harness, see the |
| link:https://github.com/apache/kudu/tree/master/examples/java/java-example[java-example] |
| project in the Kudu source code repository, or look at the various Kudu |
| integration tests under |
| link:https://github.com/apache/kudu/tree/master/java[java] in the Kudu source |
| code repository. |
| |
| == Kudu Python Client |
| The Kudu Python client provides a Python friendly interface to the C++ client API. |
| The sample below demonstrates the use of part of the Python client. |
| [source,python] |
| ---- |
| import kudu |
| from kudu.client import Partitioning |
| from datetime import datetime |
| |
| # Connect to Kudu master server |
| client = kudu.connect(host='kudu.master', port=7051) |
| |
| # Define a schema for a new table |
| builder = kudu.schema_builder() |
| builder.add_column('key').type(kudu.int64).nullable(False).primary_key() |
| builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4') |
| schema = builder.build() |
| |
| # Define partitioning schema |
| partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) |
| |
| # Create new table |
| client.create_table('python-example', schema, partitioning) |
| |
| # Open a table |
| table = client.table('python-example') |
| |
| # Create a new session so that we can apply write operations |
| session = client.new_session() |
| |
| # Insert a row |
| op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()}) |
| session.apply(op) |
| |
| # Upsert a row |
| op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"}) |
| session.apply(op) |
| |
| # Updating a row |
| op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")}) |
| session.apply(op) |
| |
| # Delete a row |
| op = table.new_delete({'key': 2}) |
| session.apply(op) |
| |
| # Flush write operations, if failures occur, capture print them. |
| try: |
| session.flush() |
| except kudu.KuduBadStatus as e: |
| print(session.get_pending_errors()) |
| |
| # Create a scanner and add a predicate |
| scanner = table.scanner() |
| scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1)) |
| |
| # Open Scanner and read all tuples |
| # Note: This doesn't scale for large scans |
| result = scanner.open().read_all_tuples() |
| ---- |
| |
| == Integration with MapReduce, YARN, and Other Frameworks |
| |
| Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in |
| the Hadoop ecosystem. See |
| link:https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/RowCounter.java[RowCounter.java] |
| and |
| link:https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportCsv.java[ImportCsv.java] |
| for examples which you can model your own integrations on. Stay tuned for more examples |
| using YARN and Spark in the future. |
| |