blob: 65925c8db4eb60ad0ee87acd503ca54c2420ede4 [file] [log] [blame] [view]
<!--
{% comment %}
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->
# Spark SQL Streaming JDBC Data Source
A library for writing data to JDBC using Spark SQL Streaming (or Structured streaming).
## Linking
Using SBT:
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-jdbc" % "{{site.SPARK_VERSION}}"
Using Maven:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-sql-streaming-jdbc_{{site.SCALA_BINARY_VERSION}}</artifactId>
<version>{{site.SPARK_VERSION}}</version>
</dependency>
This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
For example, to include it when starting the spark shell:
$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-jdbc_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
## Configuration options
The configuration is obtained from parameters.
Name |Default | Meaning
--- |:---:| ---
url|required, no default value|jdbc url, like 'jdbc:mysql://127.0.0.1:3306/test?characterEncoding=UTF8'
dbtable|required, no default value|table name
driver|Attempts to locate a driver that understands the given URL by DriverManager, if driver parameter not specificed|full driver class name, like 'com.mysql.jdbc.Driver'
user|None|username for database
password|None|password for database
batchsize|1000|records is batched writted to jdbc, to decrease jdbc pressure
maxRetryNumber|4|max retry number before a task write to jdbc fails
checkValidTimeoutSeconds|10|We cache a connection to avoid creating a new jdbc connection for each batch, timeout for checking connection valid
## Examples
### Scala API
An example, for scala API to count words from incoming message stream.
// Create DataFrame from some stream source
val query = df.writeStream
.format("streaming-jdbc")
.option("checkpointLocation", "/path/to/localdir")
.outputMode("Append")
.option("url", "my jdbc url")
.option("dbtable", "myTableName")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "my database username")
.option("password", "my database password")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
Please see `JdbcSinkDemo.scala` for full example.
### Java API
An example, for Java API to count words from incoming message stream.
StreamingQuery query = result
.writeStream()
.outputMode("append")
.format("streaming-jdbc")
.outputMode(OutputMode.Append())
.option(JDBCOptions.JDBC_URL(), jdbcUrl)
.option(JDBCOptions.JDBC_TABLE_NAME(), tableName)
.option(JDBCOptions.JDBC_DRIVER_CLASS(), "com.mysql.jdbc.Driver")
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), "5")
.option("user", username)
.option("password", password)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start();
query.awaitTermination();
Please see `JavaJdbcSinkDemo.java` for full example.