)]}'
{
  "commit": "8b0d544e59f91dd2f4752a3afb0b358a8635cd1b",
  "tree": "055eb2c145913523a3e7cd6ab7e19baecef9123a",
  "parents": [
    "bdc42527b2045e87452cf307ab527fdbdecf152e"
  ],
  "author": {
    "name": "David Milicevic",
    "email": "david.milicevic@databricks.com",
    "time": "Thu Jun 18 14:07:19 2026 +0200"
  },
  "committer": {
    "name": "Max Gekk",
    "email": "max.gekk@gmail.com",
    "time": "Thu Jun 18 14:07:19 2026 +0200"
  },
  "message": "[SPARK-55444][SQL] Types Framework - Phase 3a - Storage Formats (Parquet)\n\n### What changes were proposed in this pull request?\nThis PR implements **Phase 3a (Storage Formats - Parquet)** of the Spark Types Framework ([SPARK-55444](https://issues.apache.org/jira/browse/SPARK-55444), parent: [SPARK-53504](https://issues.apache.org/jira/browse/SPARK-53504)). It adds a new optional `ParquetTypeOps` trait that enables framework-managed types to participate in Parquet read/write paths with zero per-type changes to Parquet infrastructure files.\n\n**Scope of this PR:** the schema-conversion, write-path, and row-based read-path integration sites — 6 Scala files modified plus 2 added. Vectorized-read (`ParquetVectorUpdaterFactory`, `VectorizedColumnReader`) and filter-pushdown (`ParquetFilters`) integration are deferred — see \"Follow-ups\" below.\n\n**New trait:** `ParquetTypeOps` in `sql/core` (package `o.a.s.sql.execution.datasources.parquet.types.ops`) following the Phase 1c pattern (ConnectArrowTypeOps — separate module, separate factory). The trait surface **implemented in this PR**:\n- **Schema conversion** (Spark DataType → Parquet schema type, write path)\n- **Value write** (writing values to the Parquet `RecordConsumer`)\n- **Row-based read** (creating Parquet converters for reading into `InternalRow`)\n- **Type gates** (`supportDataType`, plus the `isBatchReadSupported` capability flag)\n- **Schema clipping** (`parquetStructSchema` for column pruning of struct-backed types)\n\n**Not yet on the trait (deferred to Follow-ups):** vectorized-read batch updaters and filter-pushdown predicates. Today the trait only exposes the `isBatchReadSupported` gate for the vectorized path and has no filter surface.\n\n**Reference implementation:** `TimeTypeParquetOps` validates the schema, write, row-read, and type-gate paths for a primitive `INT64`-backed type.\n\n**Dispatch pattern:** Framework FIRST at all integration sites, with the entire original code extracted to `*Default` methods as fallback — the same `Ops(dt).map(_.method).getOrElse(methodDefault(dt))` pattern established in Phase 1a (PR #54223) and Phase 1c (PR #54905). The framework dispatches **unconditionally** (the `spark.sql.types.framework.enabled` flag was removed in [SPARK-57372](https://issues.apache.org/jira/browse/SPARK-57372)), so `TimeType` is always routed through the framework path; the `*Default` fallbacks run only for types the framework does not manage.\n\n**Integration sites (in this PR):**\n| File | Dispatch |\n|------|----------|\n| `ParquetSchemaConverter` | Write-path schema: framework-first `convertField` → `convertToParquetType`, original code extracted to `convertFieldDefault`. (The Parquet→Spark read/inference direction is unchanged apart from a cosmetic `case _ \u003d\u003e illegalType()` line merge; `TimeType` inference still uses the hardcoded `TimeLogicalTypeAnnotation` case.) |\n| `ParquetWriteSupport` | Framework-first split of `makeWriter` into `makeWriter` + `makeWriterDefault`. (No companion-utility extraction — `consumeGroup`/`writeFields` are untouched.) |\n| `ParquetRowConverter` | Framework-first `newConverter` with method overloading (simple for primitive, extended for struct-backed) |\n| `ParquetFileFormat` | Framework-first `supportDataType` |\n| `ParquetUtils` | Framework-first `isBatchReadSupported` |\n| `ParquetReadSupport` | Framework-first `clipParquetType` for struct-backed types via `parquetStructSchema` |\n\n**Integration sites (follow-up, not in this PR):**\n| File | Dispatch |\n|------|----------|\n| `ParquetVectorUpdaterFactory` (Java) | Framework-first `getUpdater` via Java-friendly `getVectorUpdaterOrNull` |\n| `VectorizedColumnReader` (Java) | Framework-first `isLazyDecodingSupported` via Java-friendly `isLazyDecodingSupportedFor` |\n| `ParquetFilters` | `FrameworkFilterOps` custom extractor + `orElse` on 7 PartialFunctions + framework-first `valueCanMakeFilterOn` |\n\n**Design decisions:**\n- `ParquetTypeOps` is a separate trait in `sql/core` (not on `TypeOps` in `sql/catalyst`) because Parquet types (`RecordConsumer`, `ParquetVectorUpdater`, etc.) live in `sql/core` and catalyst cannot reference them.\n- `rowRepresentationType` (Phase 1b) is NOT used for Parquet — it is scoped to row infrastructure only. Using it would erase type identity in Parquet value paths, create dispatch asymmetry between struct-backed and primitive types, and extend it beyond its designed scope.\n- `parquetStructSchema` is independent of `PhysicalDataType` — Parquet storage representation may differ from internal row representation.\n- `recordConsumer` is passed as `() \u003d\u003e RecordConsumer` (lazy supplier) because `makeWriter` is called during `init()` when `recordConsumer` is still null (set later in `prepareForWrite()`).\n- Filter dispatch (deferred to the follow-up) will use a `FrameworkFilterOps` custom extractor inside `ParquetFilters` because `ParquetSchemaType` is a private inner class that cannot be referenced from outside.\n\n**Follow-ups (not in this PR):**\n1. **Vectorized-read integration** — `ParquetVectorUpdaterFactory` and `VectorizedColumnReader` (both Java). Will use Java-friendly methods (`getVectorUpdaterOrNull`, `isLazyDecodingSupportedFor`) to dispatch from Java code paths into the Scala `ParquetTypeOps`.\n2. **Filter-pushdown integration** — `ParquetFilters` via a `FrameworkFilterOps` custom extractor added to 7 `PartialFunction`s plus framework-first `valueCanMakeFilterOn`. Requires a custom extractor because `ParquetSchemaType` is a private inner class that cannot be referenced from outside `ParquetFilters`.\n3. **Second reference implementation (struct-backed)** — `TimeType` is `INT64`-backed and exercises only the primitive paths. A struct-backed reference type would harden `parquetStructSchema`, `clipParquetType` framework-first dispatch, and the extended `newConverter` overload before downstream consumers (e.g., extended-range nanosecond timestamps, DECFLOAT) land.\n4. **Read-path guard reconciliation ([SPARK-57416](https://issues.apache.org/jira/browse/SPARK-57416))** — decide the intended `TimeType` read behavior for an `INT64 TIME(MICROS, isAdjustedToUTC\u003dtrue)` column (see \"user-facing change\" below). With the Types Framework feature flag removed ([SPARK-57372](https://issues.apache.org/jira/browse/SPARK-57372)), the framework intercepts `TimeType` unconditionally and the legacy `TimeType` branch in `ParquetRowConverter`\u0027s `*Default` path is dead code (so the previously-planned \"apply the same tightening to the `*Default` path\" option is moot — that branch never runs). The read-guard behavior actually has **two** levers, not one: the row-based guard `TimeTypeParquetOps.requireCompatibleParquetType` (which rejects), and `TimeTypeParquetOps.isBatchReadSupported` (which routes top-level reads through the legacy vectorized `LongAsNanosUpdater`, bypassing the guard and staying lenient). The reconciliation is therefore to decide whether to reject `isAdjustedToUTC\u003dtrue` at all and, if so, align both paths so the behavior no longer depends on `enableVectorizedReader`/nesting — then either keep the stricter guard with a documenting test, or relax it to restore the legacy lenient read.\n\n### Why are the changes needed?\nAdding a new data type to Spark currently requires modifying 8+ Parquet files with scattered, type-specific logic. This PR enables the framework to handle all Parquet concerns for new types — a new type implements `ParquetTypeOps` and registers in the companion\u0027s `apply()`, and the Parquet infrastructure files dispatch through it automatically.\n\nThis is the first storage format integration (Phase 3a). `TimeType` serves as the reference implementation validating the paths wired in this PR.\n\n### Does this PR introduce _any_ user-facing change?\nYes — one narrow behavior change on the **row-based** read path. For the schema-conversion, write, and row-based read paths this is a pure refactoring that routes `TimeType` through the framework with otherwise identical behavior. The exception: the framework read-path guard `TimeTypeParquetOps.requireCompatibleParquetType` (used by the row-based converter `ParquetRowConverter`) is stricter than the legacy guard it replaces. For an `INT64 TIME(MICROS, isAdjustedToUTC\u003dtrue)` column read as `TimeType` via an explicit read schema, the row-based path now rejects it (`cannotCreateParquetConverterForDataTypeError` / `FAILED_READ_FILE`) where the legacy path accepted it (returning e.g. `23:59:59.123456`).\n\nThis rejection is **not unconditional** — it only takes effect when the read actually goes row-based. Because `TimeTypeParquetOps.isBatchReadSupported \u003d true`, a top-level `TimeType` column is read by the **vectorized** path by default, and that path (`ParquetVectorUpdaterFactory`\u0027s `LongAsNanosUpdater`) never consults `requireCompatibleParquetType` — it keeps the pre-PR lenient behavior (blind `micros → nanos`). The guard therefore fires only for row-based reads: when the vectorized reader is disabled (`spark.sql.parquet.enableVectorizedReader\u003dfalse`), or when the `TimeType` column is nested inside a struct/array/map (always read row-based). So `INT64 TIME(MICROS, isAdjustedToUTC\u003dtrue)` read via an explicit schema succeeds vectorized (the default) but fails row-based. The path is also reachable only via an explicit read schema (schema inference already maps `isAdjustedToUTC\u003dtrue` to `illegalType()`), so the blast radius is small.\n\nThe `spark.sql.types.framework.enabled` flag was removed in [SPARK-57372](https://issues.apache.org/jira/browse/SPARK-57372), so framework dispatch for `TimeType` is unconditional — there is no OFF path that restores the legacy lenient row-based read. Whether the framework should reject `isAdjustedToUTC\u003dtrue` at all, and how to make the vectorized and row-based paths agree, is tracked in [SPARK-57416](https://issues.apache.org/jira/browse/SPARK-57416).\n\n### How was this patch tested?\nAll existing Parquet test suites pass with the Types Framework active (now unconditional — the `spark.sql.types.framework.enabled` flag was removed in [SPARK-57372](https://issues.apache.org/jira/browse/SPARK-57372)):\n- `ParquetSchemaSuite`: 131 tests passed\n- `ParquetIOSuite`: 88 tests passed (including \"Read TimeType for the logical TIME type\")\n- `ParquetVectorizedSuite`: 25 tests passed\n- `ParquetV1FilterSuite` + `ParquetV2FilterSuite`: 101 tests passed (including \"SPARK-51687: filter pushdown - time\")\n\nNew unit test `TimeTypeParquetOpsSuite` pins the accept/reject set of `requireCompatibleParquetType`, including the `isAdjustedToUTC\u003dtrue` rejection on the row-based path noted above (cross-referenced to SPARK-57416).\n\nNote: `ParquetVectorizedSuite` and the V1/V2 filter suites exercise the *existing* (non-framework) `TimeType` code paths, since vectorized-read and filter-pushdown integration are deferred to the follow-ups above.\n\nRe-validated after merging the latest `master` (post-SPARK-57372): the affected Parquet suites were re-run on Java 17 — `TimeTypeParquetOpsSuite` + `ParquetSchemaSuite` (139) and `ParquetIOSuite` (94) pass.\n\n### Was this patch authored or co-authored using generative AI tooling?\nGenerated-by: Claude Code (Claude Opus 4.6)\n\nCloses #55326 from davidm-db/davidm-db/types_framework_3a.\n\nLead-authored-by: David Milicevic \u003cdavid.milicevic@databricks.com\u003e\nCo-authored-by: Stevo Mitric \u003cstevo.mitric@databricks.com\u003e\nCo-authored-by: Stevo Mitric \u003cstevomitric2000@gmail.com\u003e\nSigned-off-by: Max Gekk \u003cmax.gekk@gmail.com\u003e\n",
  "tree_diff": [
    {
      "type": "modify",
      "old_id": "cf2626c2d63ec7fda08f0918e848eb38932d8a45",
      "old_mode": 33188,
      "old_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala",
      "new_id": "733e55a8609eebbb37b419b88274e2593ac7e2db",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala"
    },
    {
      "type": "modify",
      "old_id": "7ee5b4d224b348720a1aaa3bcbdb241ac4452add",
      "old_mode": 33188,
      "old_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala",
      "new_id": "e92858f6a1e9a95ad392774caa20eaf760b52f54",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala"
    },
    {
      "type": "modify",
      "old_id": "6c9485dc6fc8459e6adf5a83dd02031c650f01f9",
      "old_mode": 33188,
      "old_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala",
      "new_id": "c3016d929ac9c9c16d5429b8ec2649afc1ce24c4",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala"
    },
    {
      "type": "modify",
      "old_id": "c479c37b89fdf8ea3ac717a2d9751cbbf1924a5c",
      "old_mode": 33188,
      "old_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala",
      "new_id": "ba526120046497d59bd971e34f884a427a14cc4e",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala"
    },
    {
      "type": "modify",
      "old_id": "c607548139949744313567422531e069345ea7df",
      "old_mode": 33188,
      "old_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala",
      "new_id": "bafc3a9ea52f7c0fdd10102c345e4557a926761a",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala"
    },
    {
      "type": "modify",
      "old_id": "d7fd5991c75f7e7e67f56ba5312003602a1ab1cc",
      "old_mode": 33188,
      "old_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala",
      "new_id": "18c0a47facda93e20678b9b54d2212a7f3904f87",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala"
    },
    {
      "type": "add",
      "old_id": "0000000000000000000000000000000000000000",
      "old_mode": 0,
      "old_path": "/dev/null",
      "new_id": "76296500a792b45e7fe38a94633fa0920c56909b",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/ParquetTypeOps.scala"
    },
    {
      "type": "add",
      "old_id": "0000000000000000000000000000000000000000",
      "old_mode": 0,
      "old_path": "/dev/null",
      "new_id": "7f05361d8f6c9cc79c955a5d802b1fc1807bb48a",
      "new_mode": 33188,
      "new_path": "sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOps.scala"
    },
    {
      "type": "add",
      "old_id": "0000000000000000000000000000000000000000",
      "old_mode": 0,
      "old_path": "/dev/null",
      "new_id": "9a21b5e3f4bb44662fd3ab46ac48f3e8d67844ad",
      "new_mode": 33188,
      "new_path": "sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/types/ops/TimeTypeParquetOpsSuite.scala"
    }
  ]
}
