// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

[[developing]]
= Developing Applications With Apache Kudu

:author: Kudu Team
:imagesdir: ./images
:icons: font
:toc: left
:toclevels: 3
:doctype: book
:backend: html5
:sectlinks:
:experimental:

Kudu provides C++, Java and Python client APIs, as well as reference examples to illustrate
their use.

WARNING: Use of server-side or private interfaces is not supported, and interfaces
which are not part of public APIs have no stability guarantees.

== Viewing the API Documentation
include::installation.adoc[tags=view_api]

== Working Examples

Several example applications are provided in the
link:https://github.com/apache/kudu/tree/master/examples[examples directory]
of the Apache Kudu git repository. Each example includes a `README` that shows
how to compile and run it. The following list includes some of the
examples that are available today. Check the repository itself in case this list goes
out of date.

`cpp/example.cc`::
  A simple C++ application which connects to a Kudu instance, creates a table, writes data to it, then drops the table.
`java/java-example`::
  A simple Java application which connects to a Kudu instance, creates a table, writes data to it, then drops the table.
`java/collectl`::
  A small Java application which listens on a TCP socket for time series data corresponding to the Collectl wire protocol.
  The commonly-available collectl tool can be used to send example data to the server.
`java/insert-loadgen`::
  A Java application that generates random insert load.
`python/dstat-kudu`::
  An example program that shows how to use the Kudu Python API to load data into a new / existing Kudu table
  generated by an external program, `dstat` in this case.
`python/graphite-kudu`::
  An example plugin for using graphite-web with Kudu as a backend.

These examples should serve as helpful starting points for your own Kudu applications and integrations.

=== Maven Artifacts
The following Maven `<dependency>` element is valid for the Apache Kudu public release
(since 1.0.0):

[source,xml]
----
<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.9.0</version>
</dependency>
----

Convenience binary artifacts for the Java client and various Java integrations (e.g. Spark, Flume)
are also available via the link:http://repository.apache.org[ASF Maven repository] and
link:https://mvnrepository.com/artifact/org.apache.kudu[Maven Central repository].

== Example Impala Commands With Kudu

See link:kudu_impala_integration.html[Using Impala With Kudu] for guidance on installing
and using Impala with Kudu, including several `impala-shell` examples.

== Kudu Integration with Spark

Kudu integrates with Spark through the Data Source API as of version 1.0.0.
Include the kudu-spark dependency using the --packages option:

Use the kudu-spark_2.10 artifact if using Spark with Scala 2.10. Note that Spark 1 is no
longer supported in Kudu starting from version 1.6.0. So in order to use Spark 1 integrated
with Kudu, version 1.5.0 is the latest to go to.
[source]
----
spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0
----

Use kudu-spark2_2.11 artifact if using Spark 2 with Scala 2.11.

NOTE: kudu-spark versions 1.8.0 and below have slightly different syntax.
See the documentation of your version for a valid example. Versioned documentation can be found
on the link:http://kudu.apache.org/releases/[releases page].

[source]
----
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0
----

then import kudu-spark and create a dataframe:
[source,scala]
----
import org.apache.kudu.client._
import collection.JavaConverters._

// Read a table from Kudu
val df = spark.read
  .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))
  .format("kudu").load

// Query using the Spark API...
df.select("id").filter("id >= 5").show()

// ...or register a temporary table and use SQL
df.registerTempTable("kudu_table")
val filteredDF = spark.sql("select id from kudu_table where id >= 5").show()

// Use KuduContext to create, delete, or write to Kudu tables
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)

// Create a new Kudu table from a dataframe schema
// NB: No rows from the dataframe are inserted into the table
kuduContext.createTable(
    "test_table", df.schema, Seq("key"),
    new CreateTableOptions()
        .setNumReplicas(1)
        .addHashPartitions(List("key").asJava, 3))

// Insert data
kuduContext.insertRows(df, "test_table")

// Delete data
kuduContext.deleteRows(filteredDF, "test_table")

// Upsert data
kuduContext.upsertRows(df, "test_table")

// Update data
val alteredDF = df.select("id", $"count" + 1)
kuduContext.updateRows(filteredRows, "test_table")

// Data can also be inserted into the Kudu table using the data source, though the methods on
// KuduContext are preferred
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert
// in the options map
// NB: Only mode Append is supported
df.write
  .options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))
  .mode("append")
  .format("kudu").save

// Check for the existence of a Kudu table
kuduContext.tableExists("another_table")

// Delete a Kudu table
kuduContext.deleteTable("unwanted_table")
----

=== Upsert option in Kudu Spark
The upsert operation in kudu-spark supports an extra write option of `ignoreNull`. If set to true,
it will avoid setting existing column values in Kudu table to Null if the corresponding dataframe
column values are Null. If unspecified, `ignoreNull` is false by default.
[source,scala]
----
val dataDF = spark.read
  .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> simpleTableName))
  .format("kudu").load
dataDF.registerTempTable(simpleTableName)
dataDF.show()
// Below is the original data in the table 'simpleTableName'
+---+---+
|key|val|
+---+---+
|  0|foo|
+---+---+

// Upsert a row with existing key 0 and val Null with ignoreNull set to true
val nullDF = spark.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val")
val wo = new KuduWriteOptions
wo.ignoreNull = true
kuduContext.upsertRows(nullDF, simpleTableName, wo)
dataDF.show()
// The val field stays unchanged
+---+---+
|key|val|
+---+---+
|  0|foo|
+---+---+

// Upsert a row with existing key 0 and val Null with ignoreNull default/set to false
kuduContext.upsertRows(nullDF, simpleTableName)
// Equivalent to:
// val wo = new KuduWriteOptions
// wo.ignoreNull = false
// kuduContext.upsertRows(nullDF, simpleTableName, wo)
df.show()
// The val field is set to Null this time
+---+----+
|key| val|
+---+----+
|  0|null|
+---+----+
----

=== Using Spark with a Secure Kudu Cluster

The Kudu Spark integration is able to operate on secure Kudu clusters which have
authentication and encryption enabled, but the submitter of the Spark job must
provide the proper credentials. For Spark jobs using the default 'client' deploy
mode, the submitting user must have an active Kerberos ticket granted through
`kinit`. For Spark jobs using the 'cluster' deploy mode, a Kerberos principal
name and keytab location must be provided through the `--principal` and
`--keytab` arguments to `spark2-submit`.

=== Spark Integration Best Practices

==== Avoid multiple Kudu clients per cluster.

One common Kudu-Spark coding error is instantiating extra `KuduClient` objects.
In kudu-spark, a `KuduClient` is owned by the `KuduContext`. Spark application code
should not create another `KuduClient` connecting to the same cluster. Instead,
application code should use the `KuduContext` to access a `KuduClient` using
`KuduContext#syncClient`.

To diagnose multiple `KuduClient` instances in a Spark job, look for signs in
the logs of the master being overloaded by many `GetTableLocations` or
`GetTabletLocations` requests coming from different clients, usually around the
same time. This symptom is especially likely in Spark Streaming code,
where creating a `KuduClient` per task will result in periodic waves of master
requests from new clients.

=== Spark Integration Known Issues and Limitations

- Spark 2.2+ requires Java 8 at runtime even though Kudu Spark 2.x integration
  is Java 7 compatible. Spark 2.2 is the default dependency version as of
  Kudu 1.5.0.
- Kudu tables with a name containing upper case or non-ascii characters must be
  assigned an alternate name when registered as a temporary table.
- Kudu tables with a column name containing upper case or non-ascii characters
  may not be used with SparkSQL. Columns may be renamed in Kudu to work around
  this issue.
- `<>` and `OR` predicates are not pushed to Kudu, and instead will be evaluated
  by the Spark task. Only `LIKE` predicates with a suffix wildcard are pushed to
  Kudu, meaning that `LIKE "FOO%"` is pushed down but `LIKE "FOO%BAR"` isn't.
- Kudu does not support every type supported by Spark SQL. For example,
  `Date` and complex types are not supported.
- Kudu tables may only be registered as temporary tables in SparkSQL.
  Kudu tables may not be queried using HiveContext.


== Kudu Python Client
The Kudu Python client provides a Python friendly interface to the C++ client API.
The sample below demonstrates the use of part of the Python client.
[source,python]
----
import kudu
from kudu.client import Partitioning
from datetime import datetime

# Connect to Kudu master server
client = kudu.connect(host='kudu.master', port=7051)

# Define a schema for a new table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4')
schema = builder.build()

# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)

# Create new table
client.create_table('python-example', schema, partitioning)

# Open a table
table = client.table('python-example')

# Create a new session so that we can apply write operations
session = client.new_session()

# Insert a row
op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
session.apply(op)

# Upsert a row
op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
session.apply(op)

# Updating a row
op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
session.apply(op)

# Delete a row
op = table.new_delete({'key': 2})
session.apply(op)

# Flush write operations, if failures occur, capture print them.
try:
    session.flush()
except kudu.KuduBadStatus as e:
    print(session.get_pending_errors())

# Create a scanner and add a predicate
scanner = table.scanner()
scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))

# Open Scanner and read all tuples
# Note: This doesn't scale for large scans
result = scanner.open().read_all_tuples()
----

== Integration with MapReduce, YARN, and Other Frameworks

Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in
the Hadoop ecosystem. See
link:https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/RowCounter.java[RowCounter.java]
and
link:https://github.com/apache/kudu/blob/master/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportCsv.java[ImportCsv.java]
for examples which you can model your own integrations on. Stay tuned for more examples
using YARN and Spark in the future.

