blob: eec47ec284099739a6aa8c61e8d239105523e119 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This file is used by the datafu-spark unit tests
import os
import sys
from pprint import pprint as p
from pyspark_utils import df_utils
p('CHECKING IF PATHS EXISTS:')
for x in sys.path:
p('PATH ' + x + ': ' + str(os.path.exists(x)))
df_utils.activate()
df_people = sqlContext.createDataFrame([
("a", "Alice", 34),
("a", "Sara", 33),
("b", "Bob", 36),
("b", "Charlie", 30),
("c", "David", 29),
("c", "Esther", 32),
("c", "Fanny", 36),
("c", "Zoey", 36)],
["id", "name", "age"])
func_dedup_res = df_people.dedup_with_order(group_col=df_people.id,
order_cols=[df_people.age.desc(), df_people.name.desc()])
func_dedup_res.registerTempTable("dedup_with_order")
func_dedupTopN_res = df_people.dedup_top_n(n=2, group_col=df_people.id,
order_cols=[df_people.age.desc(), df_people.name.desc()])
func_dedupTopN_res.registerTempTable("dedupTopN")
func_dedup2_res = df_people.dedup_with_combiner(group_col=df_people.id, order_by_col=df_people.age, desc=True,
columns_filter=["name"], columns_filter_keep=False)
func_dedup2_res.registerTempTable("dedup_with_combiner")
func_changeSchema_res = df_people.change_schema(new_scheme=["id1", "name1", "age1"])
func_changeSchema_res.registerTempTable("changeSchema")
df_people2 = sqlContext.createDataFrame([
("a", "Laura", 34),
("a", "Stephani", 33),
("b", "Margaret", 36)],
["id", "name", "age"])
simpleDF = sqlContext.createDataFrame([
("a", "1")],
["id", "value"])
from pyspark.sql.functions import expr
func_joinSkewed_res = df_utils.join_skewed(df_left=df_people2.alias("df1"), df_right=simpleDF.alias("df2"),
join_exprs=expr("df1.id == df2.id"), num_shards=5,
join_type="inner")
func_joinSkewed_res.registerTempTable("joinSkewed")
func_broadcastJoinSkewed_res = df_utils.broadcast_join_skewed(not_skewed_df=df_people2, skewed_df=simpleDF, join_col="id",
number_of_custs_to_broadcast=5, filter_cnt=0, join_type="inner")
func_broadcastJoinSkewed_res.registerTempTable("broadcastJoinSkewed")
dfRange = sqlContext.createDataFrame([
("a", 34, 36)],
["id1", "start", "end"])
func_joinWithRange_res = df_utils.join_with_range(df_single=df_people2, col_single="age", df_range=dfRange,
col_range_start="start", col_range_end="end",
decrease_factor=5)
func_joinWithRange_res.registerTempTable("joinWithRange")
func_joinWithRangeAndDedup_res = df_utils.join_with_range_and_dedup(df_single=df_people2, col_single="age", df_range=dfRange,
col_range_start="start", col_range_end="end",
decrease_factor=5, dedup_small_range=True)
func_joinWithRangeAndDedup_res.registerTempTable("joinWithRangeAndDedup")
dfArray = sqlContext.createDataFrame([
(0.0, ["Hi", "I heard", "about", "Spark"])],
["label", "sentence_arr"])
func_explodeArray_res = df_utils.explode_array(df=dfArray, array_col=dfArray.sentence_arr, alias="token")
func_explodeArray_res.registerTempTable("explodeArray")