blob: 2af4aff1a83f338892d79fa2e4d267d1e40b5a3c [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Spark Data Source for Apache CouchDB/Cloudant</title>
<meta name="description" content="Spark Data Source for Apache CouchDB/Cloudant">
<meta name="author" content="">
<!-- Enable responsive viewport -->
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<!-- Le HTML5 shim, for IE6-8 support of HTML elements -->
<!--[if lt IE 9]>
<script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
<!-- Le styles -->
<link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet">
<link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css">
<link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" />
<!-- Le fav and touch icons -->
<!-- Update these with your own images
<link rel="shortcut icon" href="images/favicon.ico">
<link rel="apple-touch-icon" href="images/apple-touch-icon.png">
<link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png">
<link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png">
-->
<!-- make tables sortable by adding class tag "sortable" to table elements -->
<script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script>
</head>
<body>
<!-- Navigation -->
<div id="nav-bar">
<nav id="nav-container" class="navbar navbar-inverse " role="navigation">
<div class="container">
<!-- Brand and toggle get grouped for better mobile display -->
<div class="navbar-header page-scroll">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand page-scroll" href="/#home">Home</a>
</div>
<!-- Collect the nav links, forms, and other content for toggling -->
<nav class="navbar-collapse collapse" role="navigation">
<ul class="nav navbar-nav">
<li id="download">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="community">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/community" target="_self">Get Involved</a></li>
<li><a href="/contributing" target="_self">Contributing</a></li>
<li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li>
<li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li>
<li><a href="/community#source-code" target="_self">Source Code</a></li>
<li><a href="/community-members" target="_self">Project Committers</a></li>
</ul>
</li>
<li id="documentation">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li>
<li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li>
</ul>
</li>
<li id="github">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li>
<li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li>
<li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li>
</ul>
</li>
<li id="apache">
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a>
<ul class="dropdown-menu dropdown-left">
<li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li>
<li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li>
<li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li>
<li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
<li><a href="/privacy-policy" target="_self">Privacy Policy</a></li>
</ul>
</li>
</ul>
</nav><!--/.navbar-collapse -->
<!-- /.navbar-collapse -->
</div>
<!-- /.container -->
</nav>
</div>
<div class="container">
<!--<div class="hero-unit Spark Data Source for Apache CouchDB/Cloudant">
<h1></h1>
</div>
-->
<div class="row">
<div class="col-md-12">
<!--
-->
<p>A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming.</p>
<p><a href="https://cloudant.com">IBM® Cloudant®</a> is a document-oriented DataBase as a Service (DBaaS). It stores data as documents
in JSON format. It’s built with scalability, high availability, and durability in mind. It comes with a
wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and
geospatial indexing. The replication capabilities make it easy to keep data in sync between database
clusters, desktop PCs, and mobile devices.</p>
<p><a href="http://couchdb.apache.org">Apache CouchDB™</a> is open source database software that focuses on ease of use and having an architecture that “completely embraces the Web”. It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.</p>
<h2 id="linking">Linking</h2>
<p>Using SBT:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.2.0"
</code></pre></div></div>
<p>Using Maven:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>&lt;dependency&gt;
&lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
&lt;artifactId&gt;spark-sql-cloudant_2.11&lt;/artifactId&gt;
&lt;version&gt;2.2.0&lt;/version&gt;
&lt;/dependency&gt;
</code></pre></div></div>
<p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0
</code></pre></div></div>
<p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p>
<p>Submit a job in Python:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --master local[4] --jars &lt;path to cloudant-spark.jar&gt; &lt;path to python script&gt;
</code></pre></div></div>
<p>Submit a job in Scala:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class "&lt;your class&gt;" --master local[4] --jars &lt;path to cloudant-spark.jar&gt; &lt;path to your app jar&gt;
</code></pre></div></div>
<p>This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.</p>
<h2 id="configuration-options">Configuration options</h2>
<p>The configuration is obtained in the following sequence:</p>
<ol>
<li>default in the Config, which is set in the application.conf</li>
<li>key in the SparkConf, which is set in SparkConf</li>
<li>key in the parameters, which is set in a dataframe or temporaty table options</li>
<li>“spark.”+key in the SparkConf (as they are treated as the one passed in through spark-submit using –conf option)</li>
</ol>
<p>Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using –conf takes precedence over any setting in the code.</p>
<h3 id="configuration-in-applicationconf">Configuration in application.conf</h3>
<p>Default values are defined in <a href="cloudant-spark-sql/src/main/resources/application.conf">here</a>.</p>
<h3 id="configuration-on-sparkconf">Configuration on SparkConf</h3>
<table>
<thead>
<tr>
<th>Name</th>
<th style="text-align: center">Default</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td>cloudant.protocol</td>
<td style="text-align: center">https</td>
<td>protocol to use to transfer data: http or https</td>
</tr>
<tr>
<td>cloudant.host</td>
<td style="text-align: center"> </td>
<td>cloudant host url</td>
</tr>
<tr>
<td>cloudant.username</td>
<td style="text-align: center"> </td>
<td>cloudant userid</td>
</tr>
<tr>
<td>cloudant.password</td>
<td style="text-align: center"> </td>
<td>cloudant password</td>
</tr>
<tr>
<td>cloudant.useQuery</td>
<td style="text-align: center">false</td>
<td>By default, _all_docs endpoint is used if configuration ‘view’ and ‘index’ (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.</td>
</tr>
<tr>
<td>cloudant.queryLimit</td>
<td style="text-align: center">25</td>
<td>The maximum number of results returned when querying the _find endpoint.</td>
</tr>
<tr>
<td>jsonstore.rdd.partitions</td>
<td style="text-align: center">10</td>
<td>the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition</td>
</tr>
<tr>
<td>jsonstore.rdd.maxInPartition</td>
<td style="text-align: center">-1</td>
<td>the max rows in a partition. -1 means unlimited</td>
</tr>
<tr>
<td>jsonstore.rdd.minInPartition</td>
<td style="text-align: center">10</td>
<td>the min rows in a partition.</td>
</tr>
<tr>
<td>jsonstore.rdd.requestTimeout</td>
<td style="text-align: center">900000</td>
<td>the request timeout in milliseconds</td>
</tr>
<tr>
<td>bulkSize</td>
<td style="text-align: center">200</td>
<td>the bulk save size</td>
</tr>
<tr>
<td>schemaSampleSize</td>
<td style="text-align: center">“-1”</td>
<td>the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs</td>
</tr>
<tr>
<td>createDBOnSave</td>
<td style="text-align: center">“false”</td>
<td>whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.</td>
</tr>
</tbody>
</table>
<h3 id="configuration-on-spark-sql-temporary-table-or-dataframe">Configuration on Spark SQL Temporary Table or DataFrame</h3>
<p>Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS:</p>
<table>
<thead>
<tr>
<th>Name</th>
<th style="text-align: center">Default</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td>database</td>
<td style="text-align: center"> </td>
<td>cloudant database name</td>
</tr>
<tr>
<td>view</td>
<td style="text-align: center"> </td>
<td>cloudant view w/o the database name. only used for load.</td>
</tr>
<tr>
<td>index</td>
<td style="text-align: center"> </td>
<td>cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.</td>
</tr>
<tr>
<td>path</td>
<td style="text-align: center"> </td>
<td>cloudant: as database name if database is not present</td>
</tr>
<tr>
<td>schemaSampleSize</td>
<td style="text-align: center">“-1”</td>
<td>the sample size used to discover the schema for this temp table. -1 scans all documents</td>
</tr>
<tr>
<td>bulkSize</td>
<td style="text-align: center">200</td>
<td>the bulk save size</td>
</tr>
<tr>
<td>createDBOnSave</td>
<td style="text-align: center">“false”</td>
<td>whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.</td>
</tr>
</tbody>
</table>
<p>For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: <code class="language-plaintext highlighter-rouge">{id, key, value}</code>, where <code class="language-plaintext highlighter-rouge">value </code>can be a compount field. An example of loading data from a view:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')"</span><span class="p">)</span>
</code></pre></div></div>
<h3 id="configuration-on-cloudant-receiver-for-spark-streaming">Configuration on Cloudant Receiver for Spark Streaming</h3>
<table>
<thead>
<tr>
<th>Name</th>
<th style="text-align: center">Default</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td>cloudant.host</td>
<td style="text-align: center"> </td>
<td>cloudant host url</td>
</tr>
<tr>
<td>cloudant.username</td>
<td style="text-align: center"> </td>
<td>cloudant userid</td>
</tr>
<tr>
<td>cloudant.password</td>
<td style="text-align: center"> </td>
<td>cloudant password</td>
</tr>
<tr>
<td>database</td>
<td style="text-align: center"> </td>
<td>cloudant database name</td>
</tr>
<tr>
<td>selector</td>
<td style="text-align: center">all documents</td>
<td>a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector’s conditions will be retrieved from Cloudant and loaded into Spark.</td>
</tr>
</tbody>
</table>
<h3 id="configuration-in-spark-submit-using-conf-option">Configuration in spark-submit using –conf option</h3>
<p>The above stated configuration keys can also be set using <code class="language-plaintext highlighter-rouge">spark-submit --conf</code> option. When passing configuration in spark-submit, make sure adding “spark.” as prefix to the keys.</p>
<h2 id="examples">Examples</h2>
<h3 id="python-api">Python API</h3>
<h4 id="using-sql-in-python">Using SQL In Python</h4>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
<span class="p">.</span><span class="n">builder</span>\
<span class="p">.</span><span class="n">appName</span><span class="p">(</span><span class="s">"Cloudant Spark SQL Example in Python using temp tables"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.host"</span><span class="p">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.username"</span><span class="p">,</span> <span class="s">"USERNAME"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.password"</span><span class="p">,</span><span class="s">"PASSWORD"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="c1"># Loading temp table from Cloudant db
</span><span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')"</span><span class="p">)</span>
<span class="n">airportData</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT _id, airportName FROM airportTable WHERE _id &gt;= 'CAA' AND _id &lt;= 'GAA' ORDER BY _id"</span><span class="p">)</span>
<span class="n">airportData</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="k">print</span> <span class="s">'Total # of rows in airportData: '</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">airportData</span><span class="p">.</span><span class="n">count</span><span class="p">())</span>
<span class="k">for</span> <span class="n">code</span> <span class="ow">in</span> <span class="n">airportData</span><span class="p">.</span><span class="n">collect</span><span class="p">():</span>
<span class="k">print</span> <span class="n">code</span><span class="p">.</span><span class="n">_id</span>
</code></pre></div></div>
<p>See <a href="examples/python/CloudantApp.py">CloudantApp.py</a> for examples.</p>
<p>Submit job example:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py
</code></pre></div></div>
<h4 id="using-dataframe-in-python">Using DataFrame In Python</h4>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
<span class="p">.</span><span class="n">builder</span>\
<span class="p">.</span><span class="n">appName</span><span class="p">(</span><span class="s">"Cloudant Spark SQL Example in Python using dataframes"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.host"</span><span class="p">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.username"</span><span class="p">,</span> <span class="s">"USERNAME"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.password"</span><span class="p">,</span><span class="s">"PASSWORD"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"jsonstore.rdd.partitions"</span><span class="p">,</span> <span class="mi">8</span><span class="p">)</span>\
<span class="p">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="c1"># ***1. Loading dataframe from Cloudant db
</span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">read</span><span class="p">.</span><span class="n">load</span><span class="p">(</span><span class="s">"n_airportcodemapping"</span><span class="p">,</span> <span class="s">"org.apache.bahir.cloudant"</span><span class="p">)</span>
<span class="n">df</span><span class="p">.</span><span class="n">cache</span><span class="p">()</span>
<span class="n">df</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="n">df</span><span class="p">.</span><span class="nb">filter</span><span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="n">airportName</span> <span class="o">&gt;=</span> <span class="s">'Moscow'</span><span class="p">).</span><span class="n">select</span><span class="p">(</span><span class="s">"_id"</span><span class="p">,</span><span class="s">'airportName'</span><span class="p">).</span><span class="n">show</span><span class="p">()</span>
<span class="n">df</span><span class="p">.</span><span class="nb">filter</span><span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="n">_id</span> <span class="o">&gt;=</span> <span class="s">'CAA'</span><span class="p">).</span><span class="n">select</span><span class="p">(</span><span class="s">"_id"</span><span class="p">,</span><span class="s">'airportName'</span><span class="p">).</span><span class="n">show</span><span class="p">()</span>
</code></pre></div></div>
<p>See <a href="examples/python/CloudantDF.py">CloudantDF.py</a> for examples.</p>
<p>In case of doing multiple operations on a dataframe (select, filter etc.),
you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
Persisting will also speed up computation. This statement will persist an RDD in memory: <code class="language-plaintext highlighter-rouge">df.cache()</code>. Alternatively for large dbs to persist in memory &amp; disk, use:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">StorageLevel</span>
<span class="n">df</span><span class="p">.</span><span class="n">persist</span><span class="p">(</span><span class="n">storageLevel</span> <span class="o">=</span> <span class="n">StorageLevel</span><span class="p">(</span><span class="bp">True</span><span class="p">,</span> <span class="bp">True</span><span class="p">,</span> <span class="bp">False</span><span class="p">,</span> <span class="bp">True</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span>
</code></pre></div></div>
<p><a href="examples/python/CloudantDFOption.py">Sample code</a> on using DataFrame option to define cloudant configuration</p>
<h3 id="scala-api">Scala API</h3>
<h4 id="using-sql-in-scala">Using SQL In Scala</h4>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span>
<span class="o">.</span><span class="py">builder</span><span class="o">()</span>
<span class="o">.</span><span class="py">appName</span><span class="o">(</span><span class="s">"Cloudant Spark SQL Example"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.host"</span><span class="o">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.username"</span><span class="o">,</span> <span class="s">"USERNAME"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.password"</span><span class="o">,</span><span class="s">"PASSWORD"</span><span class="o">)</span>
<span class="o">.</span><span class="py">getOrCreate</span><span class="o">()</span>
<span class="c1">// For implicit conversions of Dataframe to RDDs</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
<span class="c1">// create a temp table from Cloudant db and query it using sql syntax</span>
<span class="nv">spark</span><span class="o">.</span><span class="py">sql</span><span class="o">(</span>
<span class="n">s</span><span class="s">"""
|CREATE TEMPORARY TABLE airportTable
|USING org.apache.bahir.cloudant
|OPTIONS ( database 'n_airportcodemapping')
"""</span><span class="o">.</span><span class="py">stripMargin</span><span class="o">)</span>
<span class="c1">// create a dataframe</span>
<span class="k">val</span> <span class="nv">airportData</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">sql</span><span class="o">(</span><span class="s">"SELECT _id, airportName FROM airportTable WHERE _id &gt;= 'CAA' AND _id &lt;= 'GAA' ORDER BY _id"</span><span class="o">)</span>
<span class="nv">airportData</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"Total # of rows in airportData: "</span> <span class="o">+</span> <span class="nv">airportData</span><span class="o">.</span><span class="py">count</span><span class="o">())</span>
<span class="c1">// convert dataframe to array of Rows, and process each row</span>
<span class="nv">airportData</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=&gt;</span> <span class="s">"code: "</span> <span class="o">+</span> <span class="nf">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="s">",name:"</span> <span class="o">+</span> <span class="nf">t</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="py">collect</span><span class="o">().</span><span class="py">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span>
</code></pre></div></div>
<p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantApp.scala">CloudantApp.scala</a> for examples.</p>
<p>Submit job example:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD /path/to/spark-sql-cloudant_2.11-2.2.0-tests.jar
</code></pre></div></div>
<h3 id="using-dataframe-in-scala">Using DataFrame In Scala</h3>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span>
<span class="o">.</span><span class="py">builder</span><span class="o">()</span>
<span class="o">.</span><span class="py">appName</span><span class="o">(</span><span class="s">"Cloudant Spark SQL Example with Dataframe"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.host"</span><span class="o">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.username"</span><span class="o">,</span> <span class="s">"USERNAME"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.password"</span><span class="o">,</span><span class="s">"PASSWORD"</span><span class="o">)</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"createDBOnSave"</span><span class="o">,</span><span class="s">"true"</span><span class="o">)</span> <span class="c1">// to create a db on save</span>
<span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"jsonstore.rdd.partitions"</span><span class="o">,</span> <span class="s">"20"</span><span class="o">)</span> <span class="c1">// using 20 partitions</span>
<span class="o">.</span><span class="py">getOrCreate</span><span class="o">()</span>
<span class="c1">// 1. Loading data from Cloudant db</span>
<span class="k">val</span> <span class="nv">df</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.cloudant"</span><span class="o">).</span><span class="py">load</span><span class="o">(</span><span class="s">"n_flight"</span><span class="o">)</span>
<span class="c1">// Caching df in memory to speed computations</span>
<span class="c1">// and not to retrieve data from cloudant again</span>
<span class="nv">df</span><span class="o">.</span><span class="py">cache</span><span class="o">()</span>
<span class="nv">df</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="c1">// 2. Saving dataframe to Cloudant db</span>
<span class="k">val</span> <span class="nv">df2</span> <span class="k">=</span> <span class="nv">df</span><span class="o">.</span><span class="py">filter</span><span class="o">(</span><span class="nf">df</span><span class="o">(</span><span class="s">"flightSegmentId"</span><span class="o">)</span> <span class="o">===</span> <span class="s">"AA106"</span><span class="o">)</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"flightSegmentId"</span><span class="o">,</span><span class="s">"economyClassBaseCost"</span><span class="o">)</span>
<span class="nv">df2</span><span class="o">.</span><span class="py">show</span><span class="o">()</span>
<span class="nv">df2</span><span class="o">.</span><span class="py">write</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.cloudant"</span><span class="o">).</span><span class="py">save</span><span class="o">(</span><span class="s">"n_flight2"</span><span class="o">)</span>
</code></pre></div></div>
<p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantDF.scala">CloudantDF.scala</a> for examples.</p>
<p><a href="examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala">Sample code</a> on using DataFrame option to define Cloudant configuration.</p>
<h3 id="using-streams-in-scala">Using Streams In Scala</h3>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">sparkConf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">))</span>
<span class="k">val</span> <span class="nv">changes</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CloudantReceiver</span><span class="o">(</span><span class="nc">Map</span><span class="o">(</span>
<span class="s">"cloudant.host"</span> <span class="o">-&gt;</span> <span class="s">"ACCOUNT.cloudant.com"</span><span class="o">,</span>
<span class="s">"cloudant.username"</span> <span class="o">-&gt;</span> <span class="s">"USERNAME"</span><span class="o">,</span>
<span class="s">"cloudant.password"</span> <span class="o">-&gt;</span> <span class="s">"PASSWORD"</span><span class="o">,</span>
<span class="s">"database"</span> <span class="o">-&gt;</span> <span class="s">"n_airportcodemapping"</span><span class="o">)))</span>
<span class="nv">changes</span><span class="o">.</span><span class="py">foreachRDD</span><span class="o">((</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">],</span> <span class="n">time</span><span class="k">:</span> <span class="kt">Time</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">{</span>
<span class="c1">// Get the singleton instance of SparkSession</span>
<span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nv">SparkSessionSingleton</span><span class="o">.</span><span class="py">getInstance</span><span class="o">(</span><span class="nv">rdd</span><span class="o">.</span><span class="py">sparkContext</span><span class="o">.</span><span class="py">getConf</span><span class="o">)</span>
<span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"========= $time ========="</span><span class="o">)</span>
<span class="c1">// Convert RDD[String] to DataFrame</span>
<span class="k">val</span> <span class="nv">changesDataFrame</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">json</span><span class="o">(</span><span class="n">rdd</span><span class="o">)</span>
<span class="nf">if</span> <span class="o">(!</span><span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">schema</span><span class="o">.</span><span class="py">isEmpty</span><span class="o">)</span> <span class="o">{</span>
<span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"*"</span><span class="o">).</span><span class="py">show</span><span class="o">()</span>
<span class="o">....</span>
<span class="o">}</span>
<span class="o">})</span>
<span class="nv">ssc</span><span class="o">.</span><span class="py">start</span><span class="o">()</span>
<span class="c1">// run streaming for 120 secs</span>
<span class="nv">Thread</span><span class="o">.</span><span class="py">sleep</span><span class="o">(</span><span class="mi">120000L</span><span class="o">)</span>
<span class="nv">ssc</span><span class="o">.</span><span class="py">stop</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span>
</code></pre></div></div>
<p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala">CloudantStreaming.scala</a> for examples.</p>
<p>By default, Spark Streaming will load all documents from a database. If you want to limit the loading to
specific documents, use <code class="language-plaintext highlighter-rouge">selector</code> option of <code class="language-plaintext highlighter-rouge">CloudantReceiver</code> and specify your conditions
(See <a href="examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala">CloudantStreamingSelector.scala</a>
example for more details):</p>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">changes</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CloudantReceiver</span><span class="o">(</span><span class="nc">Map</span><span class="o">(</span>
<span class="s">"cloudant.host"</span> <span class="o">-&gt;</span> <span class="s">"ACCOUNT.cloudant.com"</span><span class="o">,</span>
<span class="s">"cloudant.username"</span> <span class="o">-&gt;</span> <span class="s">"USERNAME"</span><span class="o">,</span>
<span class="s">"cloudant.password"</span> <span class="o">-&gt;</span> <span class="s">"PASSWORD"</span><span class="o">,</span>
<span class="s">"database"</span> <span class="o">-&gt;</span> <span class="s">"sales"</span><span class="o">,</span>
<span class="s">"selector"</span> <span class="o">-&gt;</span> <span class="s">"{\"month\":\"May\", \"rep\":\"John\"}"</span><span class="o">)))</span>
</code></pre></div></div>
</div>
</div>
<hr>
<!-- <p>&copy; 2021 </p>-->
<footer class="site-footer">
<div class="wrapper">
<div class="footer-col-wrapper">
<div style="text-align:center;">
<div>
Copyright &copy; 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>.
Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
<br>
Apache and the Apache Feather logo are trademarks of The Apache Software Foundation.
</div>
</div>
</div>
</div>
</footer>
</div>
<script type="text/javascript">
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-79140859-1', 'bahir.apache.org');
ga('require', 'linkid', 'linkid.js');
ga('send', 'pageview');
</script>
<script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script>
<script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script>
</body>
</html>