blob: 7d7a4ed53c988e7f3a2e3472e7ee6a87a874bc55 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Spark Data Source for Apache CouchDB/Cloudant</title>
<meta name="description" content="Spark Data Source for Apache CouchDB/Cloudant">
<meta name="author" content="">
<!-- Enable responsive viewport -->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
<!--[if lt IE 9]>
<script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
<!-- Le styles -->
<link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
<link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
<link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" />
<!-- Le fav and touch icons -->
<!-- Update these with your own images
<link rel="shortcut icon" href="images/favicon.ico">
<link rel="apple-touch-icon" href="images/apple-touch-icon.png">
<link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
<link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
-->
<!-- make tables sortable by adding class tag "sortable" to table elements -->
<script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>
</head>
<body>
<!-- Navigation -->
<div id="nav-bar">
<nav id="nav-container" class="navbar navbar-inverse " role="navigation">
<div class="container">
<!-- Brand and toggle get grouped for better mobile display -->
<div class="navbar-header page-scroll">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand page-scroll" href="/#home">Home</a>
</div>
<!-- Collect the nav links, forms, and other content for toggling -->
<nav class="navbar-collapse collapse" role="navigation">
<ul class="nav navbar-nav">
<li id="download">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="community">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/community" target="_self">Get Involved</a></li>
<li><a href="/contributing" target="_self">Contributing</a></li>
<li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
<li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
<li><a href="/community#source-code" target="_self">Source Code</a></li>
<li><a href="/community-members" target="_self">Project Committers</a></li>
</ul>
</li>
<li id="documentation">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="github">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
<li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
<li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
</ul>
</li>
<li id="apache">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
<li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
</ul>
</li>
</ul>
</nav><!--/.navbar-collapse -->
<!-- /.navbar-collapse -->
</div>
<!-- /.container -->
</nav>
</div>
<div class="container">
<!--<div class="hero-unit Spark Data Source for Apache CouchDB/Cloudant">
<h1></h1>
</div>
-->
<div class="row">
<div class="col-md-12">
<!--
-->
<!--
-->
<h1 id="apache-couchdbcloudant-data-source-streaming-connector-and-sql-streaming-data-source">Apache CouchDB/Cloudant Data Source, Streaming Connector and SQL Streaming Data Source</h1>
<p>A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming.</p>
<p><a href="https://cloudant.com">IBM® Cloudant®</a> is a document-oriented DataBase as a Service (DBaaS). It stores data as documents
in JSON format. It’s built with scalability, high availability, and durability in mind. It comes with a
wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and
geospatial indexing. The replication capabilities make it easy to keep data in sync between database
clusters, desktop PCs, and mobile devices.</p>
<p><a href="http://couchdb.apache.org">Apache CouchDB™</a> is open source database software that focuses on ease of use and having an architecture that “completely embraces the Web”. It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.</p>
<h2 id="linking">Linking</h2>
<p>Using SBT:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.4.0"
</code></pre></div></div>
<p>Using Maven:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
&lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
&lt;artifactId&gt;spark-sql-cloudant_2.11&lt;/artifactId&gt;
&lt;version&gt;2.4.0&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>
<p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.4.0
</code></pre></div></div>
<p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p>
<p>Submit a job in Python:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --master local[4] --packages org.apache.bahir:spark-sql-cloudant__2.11:2.4.0 &lt;path to python script&gt;
</code></pre></div></div>
<p>Submit a job in Scala:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class "&lt;your class&gt;" --master local[4] --packages org.apache.bahir:spark-sql-cloudant__2.11:2.4.0 &lt;path to spark-sql-cloudant jar&gt;
</code></pre></div></div>
<p>This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.</p>
<h2 id="configuration-options">Configuration options</h2>
<p>The configuration is obtained in the following sequence:</p>
<ol>
<li>default in the Config, which is set in the application.conf</li>
<li>key in the SparkConf, which is set in SparkConf</li>
<li>key in the parameters, which is set in a dataframe or temporaty table options</li>
<li>“spark.”+key in the SparkConf (as they are treated as the one passed in through spark-submit using –conf option)</li>
</ol>
<p>Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using –conf takes precedence over any setting in the code.</p>
<h3 id="configuration-in-applicationconf">Configuration in application.conf</h3>
<p>Default values are defined in <a href="src/main/resources/application.conf">here</a>.</p>
<h3 id="configuration-on-sparkconf">Configuration on SparkConf</h3>
<table>
<thead>
<tr>
<th>Name</th>
<th style="text-align: center">Default</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td>cloudant.batchInterval</td>
<td style="text-align: center">8</td>
<td>number of seconds to set for streaming all documents from <code class="language-plaintext highlighter-rouge">_changes</code> endpoint into Spark dataframe. See <a href="https://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval">Setting the right batch interval</a> for tuning this value.</td>
</tr>
<tr>
<td>cloudant.endpoint</td>
<td style="text-align: center"><code class="language-plaintext highlighter-rouge">_all_docs</code></td>
<td>endpoint for RelationProvider when loading data from Cloudant to DataFrames or SQL temporary tables. Select between the Cloudant <code class="language-plaintext highlighter-rouge">_all_docs</code> or <code class="language-plaintext highlighter-rouge">_changes</code> API endpoint. See <strong>Note</strong> below for differences between endpoints.</td>
</tr>
<tr>
<td>cloudant.protocol</td>
<td style="text-align: center">https</td>
<td>protocol to use to transfer data: http or https</td>
</tr>
<tr>
<td>cloudant.host</td>
<td style="text-align: center"> </td>
<td>cloudant host url</td>
</tr>
<tr>
<td>cloudant.username</td>
<td style="text-align: center"> </td>
<td>cloudant userid</td>
</tr>
<tr>
<td>cloudant.password</td>
<td style="text-align: center"> </td>
<td>cloudant password</td>
</tr>
<tr>
<td>cloudant.numberOfRetries</td>
<td style="text-align: center">3</td>
<td>number of times to replay a request that received a 429 <code class="language-plaintext highlighter-rouge">Too Many Requests</code> response</td>
</tr>
<tr>
<td>cloudant.useQuery</td>
<td style="text-align: center">false</td>
<td>by default, <code class="language-plaintext highlighter-rouge">_all_docs</code> endpoint is used if configuration ‘view’ and ‘index’ (see below) are not set. When useQuery is enabled, <code class="language-plaintext highlighter-rouge">_find</code> endpoint will be used in place of <code class="language-plaintext highlighter-rouge">_all_docs</code> when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.</td>
</tr>
<tr>
<td>cloudant.queryLimit</td>
<td style="text-align: center">25</td>
<td>the maximum number of results returned when querying the <code class="language-plaintext highlighter-rouge">_find</code> endpoint.</td>
</tr>
<tr>
<td>cloudant.storageLevel</td>
<td style="text-align: center">MEMORY_ONLY</td>
<td>the storage level for persisting Spark RDDs during load when <code class="language-plaintext highlighter-rouge">cloudant.endpoint</code> is set to <code class="language-plaintext highlighter-rouge">_changes</code>. See <a href="https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence">RDD Persistence section</a> in Spark’s Progamming Guide for all available storage level options.</td>
</tr>
<tr>
<td>cloudant.timeout</td>
<td style="text-align: center">60000</td>
<td>stop the response after waiting the defined number of milliseconds for data. Only supported with <code class="language-plaintext highlighter-rouge">changes</code> endpoint.</td>
</tr>
<tr>
<td>jsonstore.rdd.partitions</td>
<td style="text-align: center">10</td>
<td>the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition. Only supported with <code class="language-plaintext highlighter-rouge">_all_docs</code> endpoint.</td>
</tr>
<tr>
<td>jsonstore.rdd.maxInPartition</td>
<td style="text-align: center">-1</td>
<td>the max rows in a partition. -1 means unlimited</td>
</tr>
<tr>
<td>jsonstore.rdd.minInPartition</td>
<td style="text-align: center">10</td>
<td>the min rows in a partition.</td>
</tr>
<tr>
<td>jsonstore.rdd.requestTimeout</td>
<td style="text-align: center">900000</td>
<td>the request timeout in milliseconds</td>
</tr>
<tr>
<td>bulkSize</td>
<td style="text-align: center">200</td>
<td>the bulk save size</td>
</tr>
<tr>
<td>schemaSampleSize</td>
<td style="text-align: center">-1</td>
<td>the sample size for RDD schema discovery. 1 means we are using only the first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs. Only supported with <code class="language-plaintext highlighter-rouge">_all_docs</code> endpoint.</td>
</tr>
<tr>
<td>createDBOnSave</td>
<td style="text-align: center">false</td>
<td>whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.</td>
</tr>
</tbody>
</table>
<p>The <code class="language-plaintext highlighter-rouge">cloudant.endpoint</code> option sets ` _changes<code class="language-plaintext highlighter-rouge"> or </code>_all_docs` API endpoint to be called while loading Cloudant data into Spark DataFrames or SQL Tables.</p>
<p><strong>Note:</strong> When using <code class="language-plaintext highlighter-rouge">_changes</code> API, please consider:</p>
<ol>
<li>Results are partially ordered and may not be be presented in order in
which documents were updated.</li>
<li>In case of shards’ unavailability, you may see duplicate results (changes that have been seen already)</li>
<li>Can use <code class="language-plaintext highlighter-rouge">selector</code> option to filter Cloudant docs during load</li>
<li>Supports a real snapshot of the database and represents it in a single point of time.</li>
<li>Only supports a single partition.</li>
</ol>
<p>When using <code class="language-plaintext highlighter-rouge">_all_docs</code> API:</p>
<ol>
<li>Supports parallel reads (using offset and range) and partitioning.</li>
<li>Using partitions may not represent the true snapshot of a database. Some docs
may be added or deleted in the database between loading data into different
Spark partitions.</li>
</ol>
<p>If loading Cloudant docs from a database greater than 100 MB, set <code class="language-plaintext highlighter-rouge">cloudant.endpoint</code> to <code class="language-plaintext highlighter-rouge">_changes</code> and <code class="language-plaintext highlighter-rouge">spark.streaming.unpersist</code> to <code class="language-plaintext highlighter-rouge">false</code>.
This will enable RDD persistence during load against <code class="language-plaintext highlighter-rouge">_changes</code> endpoint and allow the persisted RDDs to be accessible after streaming completes.</p>
<p>See <a href="src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala">CloudantChangesDFSuite</a>
for examples of loading data into a Spark DataFrame with <code class="language-plaintext highlighter-rouge">_changes</code> API.</p>
<h3 id="configuration-on-spark-sql-temporary-table-or-dataframe">Configuration on Spark SQL Temporary Table or DataFrame</h3>
<p>Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS:</p>
<table>
<thead>
<tr>
<th>Name</th>
<th style="text-align: center">Default</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td>bulkSize</td>
<td style="text-align: center">200</td>
<td>the bulk save size</td>
</tr>
<tr>
<td>createDBOnSave</td>
<td style="text-align: center">false</td>
<td>whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.</td>
</tr>
<tr>
<td>database</td>
<td style="text-align: center"> </td>
<td>Cloudant database name</td>
</tr>
<tr>
<td>index</td>
<td style="text-align: center"> </td>
<td>Cloudant Search index without the database name. Search index queries are limited to returning 200 results so can only be used to load data with &lt;= 200 results.</td>
</tr>
<tr>
<td>path</td>
<td style="text-align: center"> </td>
<td>Cloudant: as database name if database is not present</td>
</tr>
<tr>
<td>schemaSampleSize</td>
<td style="text-align: center">-1</td>
<td>the sample size used to discover the schema for this temp table. -1 scans all documents</td>
</tr>
<tr>
<td>selector</td>
<td style="text-align: center">all documents</td>
<td>a selector written in Cloudant Query syntax, specifying conditions for selecting documents when the <code class="language-plaintext highlighter-rouge">cloudant.endpoint</code> option is set to <code class="language-plaintext highlighter-rouge">_changes</code>. Only documents satisfying the selector’s conditions will be retrieved from Cloudant and loaded into Spark.</td>
</tr>
<tr>
<td>view</td>
<td style="text-align: center"> </td>
<td>Cloudant view w/o the database name. only used for load.</td>
</tr>
</tbody>
</table>
<p>For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: <code class="language-plaintext highlighter-rouge">{id, key, value}</code>, where <code class="language-plaintext highlighter-rouge">value </code>can be a compount field. An example of loading data from a view:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')"</span><span class="p">)</span>
</code></pre></div></div>
<h3 id="configuration-on-cloudant-receiver-for-spark-streaming">Configuration on Cloudant Receiver for Spark Streaming</h3>
<table>
<thead>
<tr>
<th>Name</th>
<th style="text-align: center">Default</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td>cloudant.host</td>
<td style="text-align: center"> </td>
<td>cloudant host url</td>
</tr>
<tr>
<td>cloudant.username</td>
<td style="text-align: center"> </td>
<td>cloudant userid</td>
</tr>
<tr>
<td>cloudant.password</td>
<td style="text-align: center"> </td>
<td>cloudant password</td>
</tr>
<tr>
<td>database</td>
<td style="text-align: center"> </td>
<td>cloudant database name</td>
</tr>
<tr>
<td>selector</td>
<td style="text-align: center">all documents</td>
<td>a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector’s conditions will be retrieved from Cloudant and loaded into Spark.</td>
</tr>
</tbody>
</table>
<h3 id="configuration-in-spark-submit-using-conf-option">Configuration in spark-submit using –conf option</h3>
<p>The above stated configuration keys can also be set using <code class="language-plaintext highlighter-rouge">spark-submit --conf</code> option. When passing configuration in spark-submit, make sure adding “spark.” as prefix to the keys.</p>
<h2 id="examples">Examples</h2>
<h3 id="python-api">Python API</h3>
<h4 id="using-sql-in-python">Using SQL In Python</h4>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
<span class="p">.</span><span class="n">builder</span>\
<span class="p">.</span><span class="n">appName</span><span class="p">(</span><span class="s">"Cloudant Spark SQL Example in Python using temp tables"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.host"</span><span class="p">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.username"</span><span class="p">,</span> <span class="s">"USERNAME"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.password"</span><span class="p">,</span><span class="s">"PASSWORD"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="c1"># Loading temp table from Cloudant db
</span><span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')"</span><span class="p">)</span>
<span class="n">airportData</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT _id, airportName FROM airportTable WHERE _id &gt;= 'CAA' AND _id &lt;= 'GAA' ORDER BY _id"</span><span class="p">)</span>
<span class="n">airportData</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="k">print</span> <span class="s">'Total # of rows in airportData: '</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">airportData</span><span class="p">.</span><span class="n">count</span><span class="p">())</span>
<span class="k">for</span> <span class="n">code</span> <span class="ow">in</span> <span class="n">airportData</span><span class="p">.</span><span class="n">collect</span><span class="p">():</span>
<span class="k">print</span> <span class="n">code</span><span class="p">.</span><span class="n">_id</span>
</code></pre></div></div>
<p>See <a href="examples/python/CloudantApp.py">CloudantApp.py</a> for examples.</p>
<p>Submit job example:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --packages org.apache.bahir:spark-sql-cloudant_2.11:2.4.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py
</code></pre></div></div>
<h4 id="using-dataframe-in-python">Using DataFrame In Python</h4>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
<span class="p">.</span><span class="n">builder</span>\
<span class="p">.</span><span class="n">appName</span><span class="p">(</span><span class="s">"Cloudant Spark SQL Example in Python using dataframes"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.host"</span><span class="p">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.username"</span><span class="p">,</span> <span class="s">"USERNAME"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.password"</span><span class="p">,</span><span class="s">"PASSWORD"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"jsonstore.rdd.partitions"</span><span class="p">,</span> <span class="mi">8</span><span class="p">)</span>\
<span class="p">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="c1"># ***1. Loading dataframe from Cloudant db
</span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">read</span><span class="p">.</span><span class="n">load</span><span class="p">(</span><span class="s">"n_airportcodemapping"</span><span class="p">,</span> <span class="s">"org.apache.bahir.cloudant"</span><span class="p">)</span>
<span class="n">df</span><span class="p">.</span><span class="n">cache</span><span class="p">()</span>
<span class="n">df</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="n">df</span><span class="p">.</span><span class="nb">filter</span><span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="n">airportName</span> <span class="o">&gt;=</span> <span class="s">'Moscow'</span><span class="p">).</span><span class="n">select</span><span class="p">(</span><span class="s">"_id"</span><span class="p">,</span><span class="s">'airportName'</span><span class="p">).</span><span class="n">show</span><span class="p">()</span>
<span class="n">df</span><span class="p">.</span><span class="nb">filter</span><span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="n">_id</span> <span class="o">&gt;=</span> <span class="s">'CAA'</span><span class="p">).</span><span class="n">select</span><span class="p">(</span><span class="s">"_id"</span><span class="p">,</span><span class="s">'airportName'</span><span class="p">).</span><span class="n">show</span><span class="p">()</span>
</code></pre></div></div>
<p>See <a href="examples/python/CloudantDF.py">CloudantDF.py</a> for examples.</p>
<p>In case of doing multiple operations on a dataframe (select, filter etc.),
you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
Persisting will also speed up computation. This statement will persist an RDD in memory: <code class="language-plaintext highlighter-rouge">df.cache()</code>. Alternatively for large dbs to persist in memory &amp; disk, use:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">StorageLevel</span>
<span class="n">df</span><span class="p">.</span><span class="n">persist</span><span class="p">(</span><span class="n">storageLevel</span> <span class="o">=</span> <span class="n">StorageLevel</span><span class="p">(</span><span class="bp">True</span><span class="p">,</span> <span class="bp">True</span><span class="p">,</span> <span class="bp">False</span><span class="p">,</span> <span class="bp">True</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span>
</code></pre></div></div>
<p><a href="examples/python/CloudantDFOption.py">Sample code</a> on using DataFrame option to define cloudant configuration</p>
<h3 id="scala-api">Scala API</h3>
<h4 id="using-sql-in-scala">Using SQL In Scala</h4>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span>
<span class="o">.</span><span class="py">builder</span><span class="o">()</span>
<span class="o">.</span><span class="py">appName</span><span class="o">(</span><span class="s">"Cloudant Spark SQL Example"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.host"</span><span class="o">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.username"</span><span class="o">,</span> <span class="s">"USERNAME"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.password"</span><span class="o">,</span><span class="s">"PASSWORD"</span><span class="o">)</span>
<span class="o">.</span><span class="py">getOrCreate</span><span class="o">()</span>
<span class="c1">// For implicit conversions of Dataframe to RDDs</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
<span class="c1">// create a temp table from Cloudant db and query it using sql syntax</span>
<span class="nv">spark</span><span class="o">.</span><span class="py">sql</span><span class="o">(</span>
<span class="n">s</span><span class="s">"""
|CREATE TEMPORARY TABLE airportTable
|USING org.apache.bahir.cloudant
|OPTIONS ( database 'n_airportcodemapping')
"""</span><span class="o">.</span><span class="py">stripMargin</span><span class="o">)</span>
<span class="c1">// create a dataframe</span>
<span class="k">val</span> <span class="nv">airportData</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">sql</span><span class="o">(</span><span class="s">"SELECT _id, airportName FROM airportTable WHERE _id &gt;= 'CAA' AND _id &lt;= 'GAA' ORDER BY _id"</span><span class="o">)</span>
<span class="nv">airportData</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"Total # of rows in airportData: "</span> <span class="o">+</span> <span class="nv">airportData</span><span class="o">.</span><span class="py">count</span><span class="o">())</span>
<span class="c1">// convert dataframe to array of Rows, and process each row</span>
<span class="nv">airportData</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=&gt;</span> <span class="s">"code: "</span> <span class="o">+</span> <span class="nf">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="s">",name:"</span> <span class="o">+</span> <span class="nf">t</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="py">collect</span><span class="o">().</span><span class="py">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span>
</code></pre></div></div>
<p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantApp.scala">CloudantApp.scala</a> for examples.</p>
<p>Submit job example:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.4.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD /path/to/spark-sql-cloudant_2.11-2.4.0-tests.jar
</code></pre></div></div>
<h3 id="using-dataframe-in-scala">Using DataFrame In Scala</h3>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span>
<span class="o">.</span><span class="py">builder</span><span class="o">()</span>
<span class="o">.</span><span class="py">appName</span><span class="o">(</span><span class="s">"Cloudant Spark SQL Example with Dataframe"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.host"</span><span class="o">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.username"</span><span class="o">,</span> <span class="s">"USERNAME"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.password"</span><span class="o">,</span><span class="s">"PASSWORD"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"createDBOnSave"</span><span class="o">,</span><span class="s">"true"</span><span class="o">)</span> <span class="c1">// to create a db on save</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"jsonstore.rdd.partitions"</span><span class="o">,</span> <span class="s">"20"</span><span class="o">)</span> <span class="c1">// using 20 partitions</span>
<span class="o">.</span><span class="py">getOrCreate</span><span class="o">()</span>
<span class="c1">// 1. Loading data from Cloudant db</span>
<span class="k">val</span> <span class="nv">df</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.cloudant"</span><span class="o">).</span><span class="py">load</span><span class="o">(</span><span class="s">"n_flight"</span><span class="o">)</span>
<span class="c1">// Caching df in memory to speed computations</span>
<span class="c1">// and not to retrieve data from cloudant again</span>
<span class="nv">df</span><span class="o">.</span><span class="py">cache</span><span class="o">()</span>
<span class="nv">df</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="c1">// 2. Saving dataframe to Cloudant db</span>
<span class="k">val</span> <span class="nv">df2</span> <span class="k">=</span> <span class="nv">df</span><span class="o">.</span><span class="py">filter</span><span class="o">(</span><span class="nf">df</span><span class="o">(</span><span class="s">"flightSegmentId"</span><span class="o">)</span> <span class="o">===</span> <span class="s">"AA106"</span><span class="o">)</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"flightSegmentId"</span><span class="o">,</span><span class="s">"economyClassBaseCost"</span><span class="o">)</span>
<span class="nv">df2</span><span class="o">.</span><span class="py">show</span><span class="o">()</span>
<span class="nv">df2</span><span class="o">.</span><span class="py">write</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.cloudant"</span><span class="o">).</span><span class="py">save</span><span class="o">(</span><span class="s">"n_flight2"</span><span class="o">)</span>
</code></pre></div></div>
<p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantDF.scala">CloudantDF.scala</a> for examples.</p>
<p><a href="examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala">Sample code</a> on using DataFrame option to define Cloudant configuration.</p>
<h3 id="using-streams-in-scala">Using Streams In Scala</h3>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">sparkConf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">))</span>
<span class="k">val</span> <span class="nv">changes</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CloudantReceiver</span><span class="o">(</span><span class="nc">Map</span><span class="o">(</span>
<span class="s">"cloudant.host"</span> <span class="o">-&gt;</span> <span class="s">"ACCOUNT.cloudant.com"</span><span class="o">,</span>
<span class="s">"cloudant.username"</span> <span class="o">-&gt;</span> <span class="s">"USERNAME"</span><span class="o">,</span>
<span class="s">"cloudant.password"</span> <span class="o">-&gt;</span> <span class="s">"PASSWORD"</span><span class="o">,</span>
<span class="s">"database"</span> <span class="o">-&gt;</span> <span class="s">"n_airportcodemapping"</span><span class="o">)))</span>
<span class="nv">changes</span><span class="o">.</span><span class="py">foreachRDD</span><span class="o">((</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">],</span> <span class="n">time</span><span class="k">:</span> <span class="kt">Time</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">{</span>
<span class="c1">// Get the singleton instance of SparkSession</span>
<span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nv">SparkSessionSingleton</span><span class="o">.</span><span class="py">getInstance</span><span class="o">(</span><span class="nv">rdd</span><span class="o">.</span><span class="py">sparkContext</span><span class="o">.</span><span class="py">getConf</span><span class="o">)</span>
<span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"========= $time ========="</span><span class="o">)</span>
<span class="c1">// Convert RDD[String] to DataFrame</span>
<span class="k">val</span> <span class="nv">changesDataFrame</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">json</span><span class="o">(</span><span class="n">rdd</span><span class="o">)</span>
<span class="nf">if</span> <span class="o">(!</span><span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">schema</span><span class="o">.</span><span class="py">isEmpty</span><span class="o">)</span> <span class="o">{</span>
<span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"*"</span><span class="o">).</span><span class="py">show</span><span class="o">()</span>
<span class="o">....</span>
<span class="o">}</span>
<span class="o">})</span>
<span class="nv">ssc</span><span class="o">.</span><span class="py">start</span><span class="o">()</span>
<span class="c1">// run streaming for 120 secs</span>
<span class="nv">Thread</span><span class="o">.</span><span class="py">sleep</span><span class="o">(</span><span class="mi">120000L</span><span class="o">)</span>
<span class="nv">ssc</span><span class="o">.</span><span class="py">stop</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span>
</code></pre></div></div>
<p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala">CloudantStreaming.scala</a> for examples.</p>
<p>By default, Spark Streaming will load all documents from a database. If you want to limit the loading to
specific documents, use <code class="language-plaintext highlighter-rouge">selector</code> option of <code class="language-plaintext highlighter-rouge">CloudantReceiver</code> and specify your conditions
(See <a href="examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala">CloudantStreamingSelector.scala</a>
example for more details):</p>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">changes</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CloudantReceiver</span><span class="o">(</span><span class="nc">Map</span><span class="o">(</span>
<span class="s">"cloudant.host"</span> <span class="o">-&gt;</span> <span class="s">"ACCOUNT.cloudant.com"</span><span class="o">,</span>
<span class="s">"cloudant.username"</span> <span class="o">-&gt;</span> <span class="s">"USERNAME"</span><span class="o">,</span>
<span class="s">"cloudant.password"</span> <span class="o">-&gt;</span> <span class="s">"PASSWORD"</span><span class="o">,</span>
<span class="s">"database"</span> <span class="o">-&gt;</span> <span class="s">"sales"</span><span class="o">,</span>
<span class="s">"selector"</span> <span class="o">-&gt;</span> <span class="s">"{\"month\":\"May\", \"rep\":\"John\"}"</span><span class="o">)))</span>
</code></pre></div></div>
</div>
</div>
<hr>
<!-- <p>&copy; 2021 </p>-->
<footer class="site-footer">
<div class="wrapper">
<div class="footer-col-wrapper">
<div style="text-align:center;">
<div>
Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
<br>
Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
</div>
</div>
</div>
</div>
</footer>
</div>
<script type="text/javascript">
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-79140859-1', 'bahir.apache.org');
ga('require', 'linkid', 'linkid.js');
ga('send', 'pageview');
</script>
<script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>
<script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>
</body>
</html>