title: “DataFrame” weight: 9 type: docs aliases:

  • /spark/dataframe.html

DataFrame

Paimon supports creating table, inserting data, and querying through the Spark DataFrame API.

Create Table

You can specify table properties with option or set partition columns with partitionBy if needed.

val data: DataFrame = Seq((1, "x1", "p1"), (2, "x2", "p2")).toDF("a", "b", "pt")

data.write.format("paimon")
  .option("primary-key", "a,pt")
  .option("k1", "v1")
  .partitionBy("pt")
  .saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl")

Insert

Insert Into

You can achieve INSERT INTO semantics by setting the mode to append.

val data: DataFrame = ...

data.write.format("paimon")
  .mode("append")
  .insertInto("test_tbl") // or .saveAsTable("test_tbl") or .save("/path/to/default.db/test_tbl")

Note: insertInto ignores the column names and just uses position-based write, if you need to write by column name, use saveAsTable or save instead.

Insert Overwrite

You can achieve INSERT OVERWRITE semantics by setting the mode to overwrite with insertInto.

It supports dynamic partition overwritten for partitioned table. To enable dynamic overwritten you need to set the Spark session configuration spark.sql.sources.partitionOverwriteMode to dynamic.

val data: DataFrame = ...

data.write.format("paimon")
  .mode("overwrite")
  .insertInto("test_tbl")

Replace Table

You can achieve REPLACE TABLE semantics by setting the mode to overwrite with saveAsTable or save.

It first drops the existing table and then create a new one, so you need to specify the table’s properties or partition columns if needed.

val data: DataFrame = ...

data.write.format("paimon")
  .option("primary-key", "a,pt")
  .option("k1", "v1")
  .partitionBy("pt")
  .mode("overwrite")
  .saveAsTable("test_tbl") // or .save("/path/to/default.db/test_tbl")

Query

spark.read.format("paimon")
  .table("t") // or .load("/path/to/default.db/test_tbl")
  .show()

To specify the catalog or database, you can use

// recommend
spark.read.format("paimon")
  .table("<catalogName>.<databaseName>.<tableName>")

// or
spark.read.format("paimon")
  .option("catalog", "<catalogName>")
  .option("database", "<databaseName>")
  .option("table", "<tableName>")
  .load("/path/to/default.db/test_tbl")

You can specify other read configs through option:

// time travel
spark.read.format("paimon")
  .option("scan.snapshot-id", 1)
  .table("t")