| #!/usr/bin/env python3 |
| |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| |
| import argparse |
| from delta import * |
| import pyspark |
| from pyspark.sql.types import StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType |
| from datetime import datetime, timedelta |
| import random |
| |
| |
| def config_spark_with_delta_lake(): |
| builder = ( |
| pyspark.sql.SparkSession.builder.appName("DeltaLakeApp") |
| .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") |
| .config( |
| "spark.sql.catalog.spark_catalog", |
| "org.apache.spark.sql.delta.catalog.DeltaCatalog", |
| ) |
| ) |
| spark = configure_spark_with_delta_pip(builder).getOrCreate() |
| spark.sparkContext.setLogLevel("ERROR") |
| return spark |
| |
| |
| def create_dataset(num_records): |
| """ |
| Generate a mock employee dataset with different datatypes for testing purposes. |
| |
| Parameters: |
| - num_records (int): Number of records to generate. |
| |
| Returns: |
| - Tuple: A tuple containing a list of records and the corresponding schema. |
| - List of Records: Each record is a tuple representing a row of data. |
| - StructType: The schema defining the structure of the records. |
| |
| Example: |
| ```python |
| data, schema = create_dataset(10) |
| ``` |
| """ |
| schema = StructType([ |
| StructField("id", LongType(), False), |
| StructField("birthday", DateType(), False), |
| StructField("name", StringType(), True), |
| StructField("age", ShortType(), True), |
| StructField("salary", DoubleType(), True), |
| StructField("bonus", FloatType(), True), |
| StructField("yoe", IntegerType(), True), |
| StructField("is_fulltime", BooleanType(), True), |
| StructField("last_vacation_time", TimestampType(), True) |
| ]) |
| |
| data = [] |
| current_date = datetime.now() |
| |
| for i in range(num_records): |
| birthday = current_date - timedelta(days=random.randint(365 * 18, 365 * 30)) |
| age = (current_date - birthday).days // 365 |
| is_fulltime = random.choice([True, False, None]) |
| record = ( |
| random.randint(1, 10000000000), |
| birthday, |
| f"Employee{i+1}", |
| age, |
| random.uniform(50000, 100000), |
| random.uniform(1000, 5000) if is_fulltime else None, |
| random.randint(1, min(20, age - 15)), |
| is_fulltime, |
| datetime.now() - timedelta(hours=random.randint(1, 90)) if is_fulltime else None, |
| ) |
| data.append(record) |
| return data, schema |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description="Script to write a Delta Lake table.", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
| |
| parser.add_argument('--save_path', default=None, required=True, help="Save path for Delta table") |
| parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="append", |
| help="Specify write mode (append/overwrite)") |
| parser.add_argument('--partitioned_by', choices=("date", "name"), default=None, |
| help="Column to partition the Delta table") |
| parser.add_argument('--num_records', type=int, default=5, help="Specify number of Delta records to write") |
| |
| args = parser.parse_args() |
| |
| save_mode = args.save_mode |
| save_path = args.save_path |
| num_records = args.num_records |
| partitioned_by = args.partitioned_by |
| |
| spark = config_spark_with_delta_lake() |
| |
| data, schema = create_dataset(num_records=num_records) |
| df = spark.createDataFrame(data, schema=schema) |
| if not partitioned_by: |
| df.write.format("delta").mode(save_mode).save(save_path) |
| else: |
| df.write.format("delta").partitionBy("name").mode(save_mode).save(save_path) |
| |
| df.show() |
| |
| print(f"Generated Delta table records partitioned by {partitioned_by} in {save_path} in {save_mode} mode" |
| f" with {num_records} records.") |
| |
| |
| if __name__ == "__main__": |
| main() |