blob: a8af0fa424fb8e38aa748d1a7d337ab43c8e9443 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import SparkSession
# define cloudant related configuration
# set protocol to http if needed, default value=https
# config("cloudant.protocol","http")
spark = SparkSession\
.builder\
.appName("Cloudant Spark SQL Example in Python using dataframes")\
.config("cloudant.host","ACCOUNT.cloudant.com")\
.config("cloudant.username", "USERNAME")\
.config("cloudant.password","PASSWORD")\
.config("jsonstore.rdd.partitions", 8)\
.getOrCreate()
# ***1. Loading dataframe from Cloudant db
df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
# In case of doing multiple operations on a dataframe (select, filter etc.)
# you should persist the dataframe.
# Othewise, every operation on the dataframe will load the same data from Cloudant again.
# Persisting will also speed up computation.
df.cache() # persisting in memory
# alternatively for large dbs to persist in memory & disk:
# from pyspark import StorageLevel
# df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
df.printSchema()
df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
df.filter(df._id >= 'CAA').select("_id",'airportName').show()
# ***2. Saving a datafram to Cloudant db
df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight")
df.printSchema()
df2 = df.filter(df.flightSegmentId=='AA106')\
.select("flightSegmentId", "economyClassBaseCost")
df2.write.save("n_flight2", "org.apache.bahir.cloudant",
bulkSize = "100", createDBOnSave="true")
total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId",
"scheduledDepartureTime").orderBy(df.flightSegmentId).count()
print ("Total", total, "flights from table")
# ***3. Loading dataframe from a Cloudant search index
df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight",
index="_design/view/_search/n_flights")
df.printSchema()
total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId",
"scheduledDepartureTime").orderBy(df.flightSegmentId).count()
print ("Total", total, "flights from index")
# ***4. Loading dataframe from a Cloudant view
df = spark.read.load(format="org.apache.bahir.cloudant", path="n_flight",
view="_design/view/_view/AA0", schemaSampleSize="20")
# schema for view will always be: _id, key, value
# where value can be a complex field
df.printSchema()
df.show()