Merge branch 'main' into dependabot/maven/checker.framework.version-3.49.5
diff --git a/.env b/.env
index d3e1c1d..a778353 100644
--- a/.env
+++ b/.env
@@ -40,7 +40,7 @@
 
 # Default repository to pull and push images from
 REPO=ghcr.io/apache/arrow-java-dev
-ARROW_REPO=apache/arrow-dev
+ARROW_REPO=ghcr.io/apache/arrow-dev
 
 # The setup attempts to generate coredumps by default, in order to disable the
 # coredump generation set it to 0
@@ -53,5 +53,4 @@
 # Versions for various dependencies used to build artifacts
 # Keep in sync with apache/arrow
 ARROW_REPO_ROOT=./arrow
-PYTHON=3.9
-VCPKG="f7423ee180c4b7f40d43402c2feb3859161ef625" # 2024.06.15 Release
+VCPKG="4334d8b4c8916018600212ab4dd4bbdc343065d1"    # 2025.09.17 Release
diff --git a/.github/workflows/comment_bot.yml b/.github/workflows/comment_bot.yml
index 5fbc858..b4dbc92 100644
--- a/.github/workflows/comment_bot.yml
+++ b/.github/workflows/comment_bot.yml
@@ -30,7 +30,7 @@
     if: github.event.comment.body == 'take'
     runs-on: ubuntu-latest
     steps:
-      - uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
+      - uses: actions/github-script@v8
         with:
           github-token: ${{ secrets.GITHUB_TOKEN }}
           script: |-
diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml
index 3732089..12968bb 100644
--- a/.github/workflows/dev.yml
+++ b/.github/workflows/dev.yml
@@ -38,11 +38,11 @@
           fetch-depth: 0
           persist-credentials: false
 
-      - uses: actions/setup-python@v5
+      - uses: actions/setup-python@v6
         with:
           python-version: '3.x'
       - name: pre-commit (cache)
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
         with:
           path: ~/.cache/pre-commit
           key: pre-commit-${{ hashFiles('.pre-commit-config.yaml') }}
diff --git a/.github/workflows/dev_pr.yml b/.github/workflows/dev_pr.yml
index 7352137..2f4a48d 100644
--- a/.github/workflows/dev_pr.yml
+++ b/.github/workflows/dev_pr.yml
@@ -35,6 +35,7 @@
 
 permissions:
   contents: read
+  issues: write
   pull-requests: write
 
 jobs:
@@ -49,28 +50,28 @@
 
       - name: Ensure PR title format
         id: title-format
-        uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea  # v7.0.1
+        uses: actions/github-script@v8
         with:
           script: |
             const scripts = require(`${process.env.GITHUB_WORKSPACE}/.github/workflows/dev_pr.js`);
             return scripts.check_title_format({core, github, context});
 
       - name: Label PR
-        uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea  # v7.0.1
+        uses: actions/github-script@v8
         with:
           script: |
             const scripts = require(`${process.env.GITHUB_WORKSPACE}/.github/workflows/dev_pr.js`);
             await scripts.apply_labels({core, github, context});
 
       - name: Ensure PR is labeled
-        uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea  # v7.0.1
+        uses: actions/github-script@v8
         with:
           script: |
             const scripts = require(`${process.env.GITHUB_WORKSPACE}/.github/workflows/dev_pr.js`);
             await scripts.check_labels({core, github, context});
 
       - name: Ensure PR is linked to an issue
-        uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea  # v7.0.1
+        uses: actions/github-script@v8
         with:
           script: |
             const scripts = require(`${process.env.GITHUB_WORKSPACE}/.github/workflows/dev_pr.js`);
@@ -80,9 +81,5 @@
         if: '! github.event.pull_request.draft'
         env:
           GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-        permissions:
-          contents: read
-          issues: write
-          pull-requests: write
         run: |
           ./.github/workflows/dev_pr_milestone.sh "${GITHUB_REPOSITORY}" ${{ github.event.number }}
diff --git a/.github/workflows/rc.yml b/.github/workflows/rc.yml
index 72456fa..b53812e 100644
--- a/.github/workflows/rc.yml
+++ b/.github/workflows/rc.yml
@@ -101,7 +101,7 @@
       packages: write
     steps:
       - name: Download source archive
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           name: release-source
       - name: Extract source archive
@@ -133,13 +133,13 @@
         with:
           repository: apache/parquet-testing
           path: arrow/cpp/submodules/parquet-testing
-      - uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0
+      - uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0
         with:
           registry: ghcr.io
           username: ${{ github.actor }}
           password: ${{ secrets.GITHUB_TOKEN }}
       - name: Cache
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
         with:
           path: .docker
           key: jni-linux-${{ matrix.platform.arch }}-${{ hashFiles('arrow/cpp/**') }}
@@ -168,13 +168,13 @@
       fail-fast: false
       matrix:
         platform:
-          - { runs_on: macos-13, arch: "x86_64"}
+          - { runs_on: macos-15-intel, arch: "x86_64"}
           - { runs_on: macos-14, arch: "aarch_64" }
     env:
       MACOSX_DEPLOYMENT_TARGET: "14.0"
     steps:
       - name: Download source archive
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           name: release-source
       - name: Extract source archive
@@ -201,7 +201,7 @@
           repository: apache/parquet-testing
           path: arrow/cpp/submodules/parquet-testing
       - name: Set up Python
-        uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
+        uses: actions/setup-python@v6
         with:
           cache: 'pip'
           python-version: 3.12
@@ -222,7 +222,7 @@
           brew uninstall llvm || :
 
           # We can remove this when we drop support for
-          # macos-13. because macos-14 or later uses /opt/homebrew/
+          # macos-15-intel. because macos-14 or later with arm64 uses /opt/homebrew/
           # not /usr/local/.
           #
           # Ensure updating python@XXX with the "--overwrite" option.
@@ -270,7 +270,7 @@
         run: |
           echo "CCACHE_DIR=${PWD}/ccache" >> ${GITHUB_ENV}
       - name: Cache ccache
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
         with:
           path: ccache
           key: jni-macos-${{ matrix.platform.arch }}-${{ hashFiles('arrow/cpp/**') }}
@@ -298,30 +298,36 @@
       fail-fast: false
       matrix:
         platform:
-          - runs_on: windows-2019
+          - runs_on: windows-2022
             arch: "x86_64"
     steps:
       - name: Download source archive
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           name: release-source
       - name: Extract source archive
         shell: bash
         run: |
           tar -xf apache-arrow-java-*.tar.gz --strip-components=1
-      - name: Download the latest Apache Arrow C++
-        if: github.event_name != 'schedule'
-        shell: bash
-        run: |
-          ci/scripts/download_cpp.sh
+      # We always use the main branch for apache/arrow for now.
+      # Because we want to use
+      # https://github.com/apache/arrow/pull/47749 in
+      # apache/arrow-java. We can revert this workaround once Apache
+      # Arrow 22.0.0 that includes the change released.
+      #
+      # - name: Download the latest Apache Arrow C++
+      #   if: github.event_name != 'schedule'
+      #   shell: bash
+      #   run: |
+      #     ci/scripts/download_cpp.sh
       - name: Checkout Apache Arrow C++
-        if: github.event_name == 'schedule'
+        # if: github.event_name == 'schedule'
         uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
         with:
           repository: apache/arrow
           path: arrow
       - name: Set up Java
-        uses: actions/setup-java@7a6d8a8234af8eb26422e24e3006232cccaa061b # v4.6.0
+        uses: actions/setup-java@v5
         with:
           java-version: '11'
           distribution: 'temurin'
@@ -346,7 +352,7 @@
         run: |
           echo "CCACHE_DIR=${PWD}/ccache" >> ${GITHUB_ENV}
       - name: Cache ccache
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
         with:
           path: ccache
           key: jni-windows-${{ matrix.platform.arch }}-${{ hashFiles('arrow/cpp/**') }}
@@ -354,7 +360,7 @@
       - name: Build
         shell: cmd
         run: |
-          call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\vcvarsall.bat" x64
+          call "C:\Program Files\Microsoft Visual Studio\2022\Enterprise\VC\Auxiliary\Build\vcvarsall.bat" x64
           REM For ORC
           set TZDIR=/c/msys64/usr/share/zoneinfo
           bash -c "ci/scripts/jni_windows_build.sh . arrow build jni"
@@ -375,7 +381,7 @@
       - jni-windows
     steps:
       - name: Download artifacts
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           path: artifacts
       - name: Decompress artifacts
@@ -420,7 +426,7 @@
           repository: apache/arrow-testing
           path: testing
       - name: Cache ~/.m2
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@0400d5f644dc74513175e3cd8d07132dd4860809 # v4.2.4
         with:
           path: ~/.m2
           key: binaries-build-${{ hashFiles('**/*.java', '**/pom.xml') }}
@@ -452,15 +458,15 @@
       contents: read
       packages: write
     steps:
-      - uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
+      - uses: actions/setup-python@v6
         with:
           cache: 'pip'
       - name: Download source archive
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           name: release-source
       - name: Download Javadocs
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           name: reference
       - name: Extract source archive
@@ -525,7 +531,7 @@
           cp ../.asf.yaml ./
           git add .nojekyll .asf.yaml
       - name: Download
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           name: release-html
       - name: Extract
@@ -561,7 +567,7 @@
           - ubuntu-latest
     steps:
       - name: Download release artifacts
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           pattern: release-*
       - name: Verify
@@ -595,7 +601,7 @@
       contents: write
     steps:
       - name: Download release artifacts
-        uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
+        uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
         with:
           pattern: release-*
           path: artifacts
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index f1028ce..65fbc26 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -58,12 +58,12 @@
       MAVEN: ${{ matrix.maven }}
     steps:
       - name: Checkout Arrow
-        uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1
+        uses: actions/checkout@v5
         with:
           fetch-depth: 0
           submodules: recursive
       - name: Cache Docker Volumes
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@v4
         with:
           path: .docker
           key: maven-${{ matrix.jdk }}-${{ matrix.maven }}-${{ hashFiles('compose.yaml', '**/pom.xml', '**/*.java') }}
@@ -95,12 +95,12 @@
             macos: latest
     steps:
       - name: Set up Java
-        uses: actions/setup-java@v4
+        uses: actions/setup-java@v5
         with:
           distribution: 'temurin'
           java-version: ${{ matrix.jdk }}
       - name: Checkout Arrow
-        uses: actions/checkout@v4
+        uses: actions/checkout@v5
         with:
           fetch-depth: 0
           submodules: recursive
@@ -126,12 +126,12 @@
         jdk: [11]
     steps:
       - name: Set up Java
-        uses: actions/setup-java@v4
+        uses: actions/setup-java@v5
         with:
           java-version: ${{ matrix.jdk }}
           distribution: 'temurin'
       - name: Checkout Arrow
-        uses: actions/checkout@v4
+        uses: actions/checkout@v5
         with:
           fetch-depth: 0
           submodules: recursive
@@ -152,32 +152,37 @@
     timeout-minutes: 60
     steps:
       - name: Checkout Arrow
-        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+        uses: actions/checkout@v5
         with:
           fetch-depth: 0
           repository: apache/arrow
           submodules: recursive
       - name: Checkout Arrow Rust
-        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+        uses: actions/checkout@v5
         with:
           repository: apache/arrow-rs
           path: rust
       - name: Checkout Arrow nanoarrow
-        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+        uses: actions/checkout@v5
         with:
           repository: apache/arrow-nanoarrow
           path: nanoarrow
+      - name: Checkout Arrow .NET
+        uses: actions/checkout@v5
+        with:
+          repository: apache/arrow-dotnet
+          path: dotnet
       - name: Checkout Arrow Go
-        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+        uses: actions/checkout@v5
         with:
           repository: apache/arrow-go
           path: go
       - name: Checkout Arrow Java
-        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+        uses: actions/checkout@v5
         with:
           path: java
       - name: Checkout Arrow JavaScript
-        uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+        uses: actions/checkout@v5
         with:
           repository: apache/arrow-js
           path: js
@@ -185,13 +190,13 @@
         run: |
           ci/scripts/util_free_space.sh
       - name: Cache Docker Volumes
-        uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+        uses: actions/cache@v4
         with:
           path: .docker
           key: integration-conda-${{ hashFiles('cpp/**') }}
           restore-keys: integration-conda-
       - name: Setup Python
-        uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
+        uses: actions/setup-python@v6
         with:
           python-version: 3.12
       - name: Setup Archery
@@ -202,6 +207,7 @@
           archery docker run \
             -e ARCHERY_DEFAULT_BRANCH=main \
             -e ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS=java \
+            -e ARCHERY_INTEGRATION_WITH_DOTNET=1 \
             -e ARCHERY_INTEGRATION_WITH_GO=1 \
             -e ARCHERY_INTEGRATION_WITH_JAVA=1 \
             -e ARCHERY_INTEGRATION_WITH_JS=1 \
diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java
index aedef77..a6e77e4 100644
--- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java
+++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java
@@ -1071,8 +1071,8 @@
   }
 
   private static String convertAliases(Set<String> aliases) {
-    JsonStringArrayList jsonList = new JsonStringArrayList();
-    aliases.stream().forEach(a -> jsonList.add(a));
+    JsonStringArrayList jsonList = new JsonStringArrayList(aliases.size());
+    jsonList.addAll(aliases);
     return jsonList.toString();
   }
 }
diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowVectorIterator.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowVectorIterator.java
index 4123370..e82fdc3 100644
--- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowVectorIterator.java
+++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowVectorIterator.java
@@ -17,13 +17,14 @@
 package org.apache.arrow.adapter.avro;
 
 import java.io.EOFException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.arrow.adapter.avro.consumers.CompositeAvroConsumer;
+import org.apache.arrow.adapter.avro.consumers.Consumer;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.ValueVectorUtility;
@@ -75,9 +76,11 @@
   private void initialize() {
     // create consumers
     compositeConsumer = AvroToArrowUtils.createCompositeConsumer(schema, config);
-    List<FieldVector> vectors = new ArrayList<>();
-    compositeConsumer.getConsumers().forEach(c -> vectors.add(c.getVector()));
-    List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
+    List<FieldVector> vectors =
+        compositeConsumer.getConsumers().stream()
+            .map(Consumer::getVector)
+            .collect(Collectors.toList());
+    List<Field> fields = vectors.stream().map(ValueVector::getField).collect(Collectors.toList());
     VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
     rootSchema = root.getSchema();
 
diff --git a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
index 2366116..b8389ee 100644
--- a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
+++ b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
@@ -24,7 +24,6 @@
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.ArrowType;
 
 /** Composite consumer which hold all consumers. It manages the consume and cleanup process. */
 public class CompositeJdbcConsumer implements JdbcConsumer {
@@ -46,9 +45,9 @@
           BaseConsumer consumer = (BaseConsumer) consumers[i];
           JdbcFieldInfo fieldInfo =
               new JdbcFieldInfo(rs.getMetaData(), consumer.columnIndexInResultSet);
-          ArrowType arrowType = consumer.vector.getMinorType().getType();
+
           throw new JdbcConsumerException(
-              "Exception while consuming JDBC value", e, fieldInfo, arrowType);
+              "Exception while consuming JDBC value", e, fieldInfo, consumer.vector.getField());
         } else {
           throw e;
         }
diff --git a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/exceptions/JdbcConsumerException.java b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/exceptions/JdbcConsumerException.java
index 04e26d6..98927f4 100644
--- a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/exceptions/JdbcConsumerException.java
+++ b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/exceptions/JdbcConsumerException.java
@@ -17,7 +17,7 @@
 package org.apache.arrow.adapter.jdbc.consumer.exceptions;
 
 import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
-import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
 
 /**
  * Exception while consuming JDBC data. This exception stores the JdbcFieldInfo for the column and
@@ -25,7 +25,7 @@
  */
 public class JdbcConsumerException extends RuntimeException {
   final JdbcFieldInfo fieldInfo;
-  final ArrowType arrowType;
+  final Field field;
 
   /**
    * Construct JdbcConsumerException with all fields.
@@ -33,17 +33,17 @@
    * @param message error message
    * @param cause original exception
    * @param fieldInfo JdbcFieldInfo for the column
-   * @param arrowType ArrowType for the corresponding vector
+   * @param field ArrowType for the corresponding vector
    */
   public JdbcConsumerException(
-      String message, Throwable cause, JdbcFieldInfo fieldInfo, ArrowType arrowType) {
+      String message, Throwable cause, JdbcFieldInfo fieldInfo, Field field) {
     super(message, cause);
     this.fieldInfo = fieldInfo;
-    this.arrowType = arrowType;
+    this.field = field;
   }
 
-  public ArrowType getArrowType() {
-    return this.arrowType;
+  public Field getField() {
+    return this.field;
   }
 
   public JdbcFieldInfo getFieldInfo() {
diff --git a/bom/pom.xml b/bom/pom.xml
index 61b452b..655c33b 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -165,7 +165,7 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
-        <groupId>org.apache.arrow</groupId>
+        <groupId>org.apache.arrow.gandiva</groupId>
         <artifactId>arrow-gandiva</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git a/c/src/main/cpp/jni_wrapper.cc b/c/src/main/cpp/jni_wrapper.cc
index 436cbdc..3d7a194 100644
--- a/c/src/main/cpp/jni_wrapper.cc
+++ b/c/src/main/cpp/jni_wrapper.cc
@@ -327,19 +327,20 @@
 
 jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   JNIEnv* env;
-  if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
-    return JNI_ERR;
+  const int err_code = vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION);
+  if (err_code != JNI_OK) {
+      return err_code;
   }
   JNI_METHOD_START
-  kObjectClass = CreateGlobalClassReference(env, "Ljava/lang/Object;");
+  kObjectClass = CreateGlobalClassReference(env, "java/lang/Object");
   kRuntimeExceptionClass =
-      CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;");
+      CreateGlobalClassReference(env, "java/lang/RuntimeException");
   kPrivateDataClass =
-      CreateGlobalClassReference(env, "Lorg/apache/arrow/c/jni/PrivateData;");
+      CreateGlobalClassReference(env, "org/apache/arrow/c/jni/PrivateData");
   kCDataExceptionClass =
-      CreateGlobalClassReference(env, "Lorg/apache/arrow/c/jni/CDataJniException;");
+      CreateGlobalClassReference(env, "org/apache/arrow/c/jni/CDataJniException");
   kStreamPrivateDataClass = CreateGlobalClassReference(
-      env, "Lorg/apache/arrow/c/ArrayStreamExporter$ExportedArrayStreamPrivateData;");
+      env, "org/apache/arrow/c/ArrayStreamExporter$ExportedArrayStreamPrivateData");
 
   kPrivateDataLastErrorField =
       GetFieldID(env, kStreamPrivateDataClass, "lastError", "[B");
diff --git a/c/src/main/java/org/apache/arrow/c/jni/JniLoader.java b/c/src/main/java/org/apache/arrow/c/jni/JniLoader.java
index f712b40..46c93f5 100644
--- a/c/src/main/java/org/apache/arrow/c/jni/JniLoader.java
+++ b/c/src/main/java/org/apache/arrow/c/jni/JniLoader.java
@@ -75,8 +75,23 @@
   }
 
   private void load(String name) {
-    final String libraryToLoad =
-        name + "/" + getNormalizedArch() + "/" + System.mapLibraryName(name);
+    String libraryName = System.mapLibraryName(name);
+
+    // If 'arrow.cdata.library.path' is defined, try to load the native library from there
+    String libraryPath = System.getProperty("arrow.cdata.library.path");
+    if (libraryPath != null) {
+      try {
+        File libraryFile = new File(libraryPath, libraryName);
+        if (libraryFile.isFile()) {
+          System.load(libraryFile.getAbsolutePath());
+          return;
+        }
+      } catch (UnsatisfiedLinkError e) {
+        // Ignore this error and fall back to extracting from the JAR file
+      }
+    }
+
+    final String libraryToLoad = name + "/" + getNormalizedArch() + "/" + libraryName;
     try {
       File temp =
           File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir")));
diff --git a/ci/docker/vcpkg-jni.dockerfile b/ci/docker/vcpkg-jni.dockerfile
index 55fa35e..d6bd322 100644
--- a/ci/docker/vcpkg-jni.dockerfile
+++ b/ci/docker/vcpkg-jni.dockerfile
@@ -18,24 +18,10 @@
 ARG base
 FROM ${base}
 
-# Install the libraries required by Gandiva to run
-# Use enable llvm[enable-rtti] in the vcpkg.json to avoid link problems in Gandiva
-RUN vcpkg install \
-        --clean-after-build \
-        --x-install-root=${VCPKG_ROOT}/installed \
-        --x-manifest-root=/arrow/ci/vcpkg \
-        --x-feature=dev \
-        --x-feature=flight \
-        --x-feature=gcs \
-        --x-feature=json \
-        --x-feature=parquet \
-        --x-feature=gandiva \
-        --x-feature=s3
-
 # Install Java
 # We need Java for JNI headers, but we don't invoke Maven in this build.
 ARG java=11
-RUN yum install -y java-$java-openjdk-devel && yum clean all
+RUN dnf install -y java-$java-openjdk-devel && dnf clean all
 
 # For ci/scripts/{cpp,java}_*.sh
 ENV ARROW_HOME=/tmp/local \
diff --git a/ci/scripts/jni_build.sh b/ci/scripts/jni_build.sh
index aec6fc3..c000837 100755
--- a/ci/scripts/jni_build.sh
+++ b/ci/scripts/jni_build.sh
@@ -66,7 +66,7 @@
   -DProtobuf_USE_STATIC_LIBS=ON \
   -GNinja \
   "${EXTRA_CMAKE_OPTIONS[@]}"
-cmake --build "${build_dir}"
+cmake --build "${build_dir}" --verbose
 if [ "${ARROW_JAVA_BUILD_TESTS}" = "ON" ]; then
   ctest \
     --output-on-failure \
diff --git a/ci/scripts/jni_full_build.sh b/ci/scripts/jni_full_build.sh
index e9ad0dd..5d0aee0 100755
--- a/ci/scripts/jni_full_build.sh
+++ b/ci/scripts/jni_full_build.sh
@@ -97,8 +97,10 @@
   -exec echo "{}" ";" \
   -exec cp "{}" "${dist_dir}" ";"
 
-for artifact in "${dist_dir}"/*; do
+pushd "${dist_dir}"
+for artifact in *; do
   sha256sum "${artifact}" >"${artifact}.sha256"
   sha512sum "${artifact}" >"${artifact}.sha512"
 done
+popd
 github_actions_group_end
diff --git a/ci/scripts/jni_macos_build.sh b/ci/scripts/jni_macos_build.sh
index f7543b6..13c0675 100755
--- a/ci/scripts/jni_macos_build.sh
+++ b/ci/scripts/jni_macos_build.sh
@@ -59,72 +59,24 @@
 
 github_actions_group_begin "Building Arrow C++ libraries"
 install_dir="${build_dir}/cpp-install"
-: "${ARROW_ACERO:=ON}"
-export ARROW_ACERO
-: "${ARROW_BUILD_TESTS:=OFF}"
-export ARROW_BUILD_TESTS
-: "${ARROW_DATASET:=ON}"
-export ARROW_DATASET
-: "${ARROW_GANDIVA:=ON}"
-export ARROW_GANDIVA
-: "${ARROW_ORC:=ON}"
-export ARROW_ORC
-: "${ARROW_PARQUET:=ON}"
-: "${ARROW_S3:=ON}"
-: "${CMAKE_BUILD_TYPE:=Release}"
-: "${CMAKE_UNITY_BUILD:=ON}"
 
-export ARROW_TEST_DATA="${arrow_dir}/testing/data"
-export PARQUET_TEST_DATA="${arrow_dir}/cpp/submodules/parquet-testing/data"
+export ARROW_BUILD_TESTS=OFF
+
+export ARROW_DATASET=ON
+export ARROW_GANDIVA=ON
+export ARROW_ORC=ON
+export ARROW_PARQUET=ON
+
 export AWS_EC2_METADATA_DISABLED=TRUE
 
 cmake \
   -S "${arrow_dir}/cpp" \
   -B "${build_dir}/cpp" \
-  -DARROW_ACERO="${ARROW_ACERO}" \
-  -DARROW_BUILD_SHARED=OFF \
-  -DARROW_BUILD_TESTS="${ARROW_BUILD_TESTS}" \
-  -DARROW_CSV="${ARROW_DATASET}" \
-  -DARROW_DATASET="${ARROW_DATASET}" \
-  -DARROW_SUBSTRAIT="${ARROW_DATASET}" \
-  -DARROW_DEPENDENCY_USE_SHARED=OFF \
-  -DARROW_GANDIVA="${ARROW_GANDIVA}" \
-  -DARROW_GANDIVA_STATIC_LIBSTDCPP=ON \
-  -DARROW_JSON="${ARROW_DATASET}" \
-  -DARROW_ORC="${ARROW_ORC}" \
-  -DARROW_PARQUET="${ARROW_PARQUET}" \
-  -DARROW_S3="${ARROW_S3}" \
-  -DARROW_USE_CCACHE="${ARROW_USE_CCACHE}" \
-  -DAWSSDK_SOURCE=BUNDLED \
-  -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \
-  -DCMAKE_INSTALL_PREFIX="${install_dir}" \
-  -DCMAKE_UNITY_BUILD="${CMAKE_UNITY_BUILD}" \
-  -DGTest_SOURCE=BUNDLED \
-  -DPARQUET_BUILD_EXAMPLES=OFF \
-  -DPARQUET_BUILD_EXECUTABLES=OFF \
-  -DPARQUET_REQUIRE_ENCRYPTION=OFF \
-  -Dre2_SOURCE=BUNDLED \
-  -GNinja
+  --preset=ninja-release-jni-macos \
+  -DCMAKE_INSTALL_PREFIX="${install_dir}"
 cmake --build "${build_dir}/cpp" --target install
 github_actions_group_end
 
-if [ "${ARROW_RUN_TESTS:-}" == "ON" ]; then
-  github_actions_group_begin "Running Arrow C++ libraries tests"
-  # MinIO is required
-  exclude_tests="arrow-s3fs-test"
-  # unstable
-  exclude_tests="${exclude_tests}|arrow-acero-asof-join-node-test"
-  exclude_tests="${exclude_tests}|arrow-acero-hash-join-node-test"
-  ctest \
-    --exclude-regex "${exclude_tests}" \
-    --label-regex unittest \
-    --output-on-failure \
-    --parallel "$(sysctl -n hw.ncpu)" \
-    --test-dir "${build_dir}/cpp" \
-    --timeout 300
-  github_actions_group_end
-fi
-
 export JAVA_JNI_CMAKE_ARGS="-DProtobuf_ROOT=${build_dir}/cpp/protobuf_ep-install"
 "${source_dir}/ci/scripts/jni_build.sh" \
   "${source_dir}" \
@@ -142,6 +94,7 @@
 pushd "${dist_dir}"
 archery linking check-dependencies \
   --allow CoreFoundation \
+  --allow Network \
   --allow Security \
   --allow libSystem \
   --allow libarrow_cdata_jni \
diff --git a/ci/scripts/jni_manylinux_build.sh b/ci/scripts/jni_manylinux_build.sh
index a34ec0f..3577c37 100755
--- a/ci/scripts/jni_manylinux_build.sh
+++ b/ci/scripts/jni_manylinux_build.sh
@@ -53,33 +53,19 @@
 fi
 
 github_actions_group_begin "Building Arrow C++ libraries"
-devtoolset_version="$(rpm -qa "devtoolset-*-gcc" --queryformat '%{VERSION}' | grep -o "^[0-9]*")"
-devtoolset_include_cpp="/opt/rh/devtoolset-${devtoolset_version}/root/usr/include/c++/${devtoolset_version}"
-: "${ARROW_ACERO:=ON}"
-export ARROW_ACERO
-: "${ARROW_BUILD_TESTS:=OFF}"
-export ARROW_BUILD_TESTS
-: "${ARROW_DATASET:=ON}"
-export ARROW_DATASET
-: "${ARROW_GANDIVA:=ON}"
-export ARROW_GANDIVA
-: "${ARROW_GCS:=ON}"
-: "${ARROW_JEMALLOC:=OFF}"
-: "${ARROW_MIMALLOC:=ON}"
-: "${ARROW_RPATH_ORIGIN:=ON}"
-: "${ARROW_ORC:=ON}"
-export ARROW_ORC
-: "${ARROW_PARQUET:=ON}"
-: "${ARROW_S3:=ON}"
-: "${CMAKE_BUILD_TYPE:=release}"
-: "${CMAKE_UNITY_BUILD:=ON}"
+
 : "${VCPKG_ROOT:=/opt/vcpkg}"
 : "${VCPKG_FEATURE_FLAGS:=-manifests}"
-: "${VCPKG_TARGET_TRIPLET:=${VCPKG_DEFAULT_TRIPLET:-x64-linux-static-${CMAKE_BUILD_TYPE}}}"
-: "${GANDIVA_CXX_FLAGS:=-isystem;${devtoolset_include_cpp};-isystem;${devtoolset_include_cpp}/x86_64-redhat-linux;-lpthread}"
+: "${VCPKG_TARGET_TRIPLET:=${VCPKG_DEFAULT_TRIPLET:-x64-linux-static-release}}"
+export VCPKG_TARGET_TRIPLET
 
-export ARROW_TEST_DATA="${arrow_dir}/testing/data"
-export PARQUET_TEST_DATA="${arrow_dir}/cpp/submodules/parquet-testing/data"
+export ARROW_BUILD_TESTS=OFF
+
+export ARROW_DATASET=ON
+export ARROW_GANDIVA=ON
+export ARROW_ORC=ON
+export ARROW_PARQUET=ON
+
 export AWS_EC2_METADATA_DISABLED=TRUE
 
 install_dir="${build_dir}/cpp-install"
@@ -87,71 +73,12 @@
 cmake \
   -S "${arrow_dir}/cpp" \
   -B "${build_dir}/cpp" \
-  -DARROW_ACERO="${ARROW_ACERO}" \
-  -DARROW_BUILD_SHARED=OFF \
-  -DARROW_BUILD_TESTS="${ARROW_BUILD_TESTS}" \
-  -DARROW_CSV="${ARROW_DATASET}" \
-  -DARROW_DATASET="${ARROW_DATASET}" \
-  -DARROW_SUBSTRAIT="${ARROW_DATASET}" \
-  -DARROW_DEPENDENCY_SOURCE="VCPKG" \
-  -DARROW_DEPENDENCY_USE_SHARED=OFF \
-  -DARROW_GANDIVA_PC_CXX_FLAGS="${GANDIVA_CXX_FLAGS}" \
-  -DARROW_GANDIVA="${ARROW_GANDIVA}" \
-  -DARROW_GCS="${ARROW_GCS}" \
-  -DARROW_JEMALLOC="${ARROW_JEMALLOC}" \
-  -DARROW_JSON="${ARROW_DATASET}" \
-  -DARROW_MIMALLOC="${ARROW_MIMALLOC}" \
-  -DARROW_ORC="${ARROW_ORC}" \
-  -DARROW_PARQUET="${ARROW_PARQUET}" \
-  -DARROW_RPATH_ORIGIN="${ARROW_RPATH_ORIGIN}" \
-  -DARROW_S3="${ARROW_S3}" \
-  -DARROW_USE_CCACHE="${ARROW_USE_CCACHE}" \
-  -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \
-  -DCMAKE_INSTALL_PREFIX="${install_dir}" \
-  -DCMAKE_UNITY_BUILD="${CMAKE_UNITY_BUILD}" \
-  -DGTest_SOURCE=BUNDLED \
-  -DORC_SOURCE=BUNDLED \
-  -DORC_PROTOBUF_EXECUTABLE="${VCPKG_ROOT}/installed/${VCPKG_TARGET_TRIPLET}/tools/protobuf/protoc" \
-  -DPARQUET_BUILD_EXAMPLES=OFF \
-  -DPARQUET_BUILD_EXECUTABLES=OFF \
-  -DPARQUET_REQUIRE_ENCRYPTION=OFF \
-  -DVCPKG_MANIFEST_MODE=OFF \
-  -DVCPKG_TARGET_TRIPLET="${VCPKG_TARGET_TRIPLET}" \
-  -GNinja
+  --preset=ninja-release-jni-linux \
+  -DCMAKE_INSTALL_PREFIX="${install_dir}"
 cmake --build "${build_dir}/cpp"
 cmake --install "${build_dir}/cpp"
 github_actions_group_end
 
-if [ "${ARROW_RUN_TESTS:-OFF}" = "ON" ]; then
-  github_actions_group_begin "Running Arrow C++ libraries tests"
-  # MinIO is required
-  exclude_tests="arrow-s3fs-test"
-  case $(arch) in
-  aarch64)
-    # GCS testbench is crashed on aarch64:
-    # ImportError: ../grpc/_cython/cygrpc.cpython-38-aarch64-linux-gnu.so:
-    # undefined symbol: vtable for std::__cxx11::basic_ostringstream<
-    #   char, std::char_traits<char>, std::allocator<char> >
-    exclude_tests="${exclude_tests}|arrow-gcsfs-test"
-    ;;
-  esac
-  # unstable
-  exclude_tests="${exclude_tests}|arrow-acero-asof-join-node-test"
-  exclude_tests="${exclude_tests}|arrow-acero-hash-join-node-test"
-  # external dependency
-  exclude_tests="${exclude_tests}|arrow-gcsfs-test"
-  # strptime
-  exclude_tests="${exclude_tests}|arrow-utility-test"
-  ctest \
-    --exclude-regex "${exclude_tests}" \
-    --label-regex unittest \
-    --output-on-failure \
-    --parallel "$(nproc)" \
-    --test-dir "${build_dir}/cpp" \
-    --timeout 300
-  github_actions_group_end
-fi
-
 JAVA_JNI_CMAKE_ARGS="-DCMAKE_TOOLCHAIN_FILE=${VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake"
 JAVA_JNI_CMAKE_ARGS="${JAVA_JNI_CMAKE_ARGS} -DVCPKG_TARGET_TRIPLET=${VCPKG_TARGET_TRIPLET}"
 export JAVA_JNI_CMAKE_ARGS
diff --git a/ci/scripts/jni_windows_build.sh b/ci/scripts/jni_windows_build.sh
index d01ef45..6503ac6 100755
--- a/ci/scripts/jni_windows_build.sh
+++ b/ci/scripts/jni_windows_build.sh
@@ -68,7 +68,7 @@
   -B "${build_dir}/cpp" \
   -DARROW_ACERO="${ARROW_ACERO}" \
   -DARROW_BUILD_SHARED=OFF \
-  -DARROW_BUILD_TESTS=ON \
+  -DARROW_BUILD_TESTS="${ARROW_BUILD_TESTS}" \
   -DARROW_CSV="${ARROW_DATASET}" \
   -DARROW_DATASET="${ARROW_DATASET}" \
   -DARROW_SUBSTRAIT="${ARROW_DATASET}" \
diff --git a/compose.yaml b/compose.yaml
index b125c3c..f5082a2 100644
--- a/compose.yaml
+++ b/compose.yaml
@@ -99,7 +99,7 @@
       cache_from:
         - ${REPO}:${ARCH}-vcpkg-jni-${VCPKG}
       args:
-        base: ${ARROW_REPO}:${ARCH}-python-${PYTHON}-wheel-manylinux-2014-vcpkg-${VCPKG}
+        base: ${ARROW_REPO}:${ARCH}-cpp-jni-${VCPKG}
     volumes:
       - .:/arrow-java:delegated
       - ${ARROW_REPO_ROOT}:/arrow:delegated
diff --git a/compression/pom.xml b/compression/pom.xml
index 6f60eb7..ba13156 100644
--- a/compression/pom.xml
+++ b/compression/pom.xml
@@ -55,7 +55,7 @@
     <dependency>
       <groupId>com.github.luben</groupId>
       <artifactId>zstd-jni</artifactId>
-      <version>1.5.7-2</version>
+      <version>1.5.7-6</version>
     </dependency>
   </dependencies>
 </project>
diff --git a/dataset/src/main/cpp/jni_wrapper.cc b/dataset/src/main/cpp/jni_wrapper.cc
index 49cc852..e808764 100644
--- a/dataset/src/main/cpp/jni_wrapper.cc
+++ b/dataset/src/main/cpp/jni_wrapper.cc
@@ -23,6 +23,7 @@
 #include "arrow/array/concatenate.h"
 #include "arrow/c/bridge.h"
 #include "arrow/c/helpers.h"
+#include "arrow/compute/initialize.h"
 #include "arrow/dataset/api.h"
 #include "arrow/dataset/file_base.h"
 #ifdef ARROW_CSV
@@ -807,6 +808,13 @@
   JNI_METHOD_END()
 }
 
+JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_initialize(
+    JNIEnv* env, jobject) {
+  JNI_METHOD_START
+  JniAssertOkOrThrow(arrow::compute::Initialize());
+  JNI_METHOD_END()
+}
+
 /*
  * Class:     org_apache_arrow_dataset_file_JniWrapper
  * Method:    makeFileSystemDatasetFactory
diff --git a/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java b/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
index 631b8b1..5fb4816 100644
--- a/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
+++ b/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
@@ -56,6 +56,7 @@
     }
     loadRemaining();
     ensureS3FinalizedOnShutdown();
+    JniWrapper.get().initialize();
   }
 
   private synchronized void loadRemaining() {
diff --git a/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
index 6637c11..cfef098 100644
--- a/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
+++ b/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
@@ -124,4 +124,7 @@
    * uninitialized, then this is a noop.
    */
   public native void ensureS3Finalized();
+
+  /** Initialize Arrow Compute. */
+  public native void initialize();
 }
diff --git a/docs/source/substrait.rst b/docs/source/substrait.rst
index b3678ac..5ec07f1 100644
--- a/docs/source/substrait.rst
+++ b/docs/source/substrait.rst
@@ -19,7 +19,7 @@
 Substrait
 =========
 
-The ``arrow-dataset`` module can execute Substrait_ plans via the :external+arrow:doc:`Acero <cpp/streaming_execution>`
+The ``arrow-dataset`` module can execute Substrait_ plans via the :external+arrow:doc:`Acero <cpp/acero>`
 query engine.
 
 Executing Queries Using Substrait Plans
diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
index 747287e..f6f1777 100644
--- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
+++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java
@@ -32,6 +32,7 @@
 import org.apache.arrow.util.Preconditions;
 import org.apache.calcite.avatica.AvaticaConnection;
 import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.DriverVersion;
 
 /** Connection to the Arrow Flight server. */
 public final class ArrowFlightConnection extends AvaticaConnection {
@@ -86,13 +87,16 @@
       throws SQLException {
     url = replaceSemiColons(url);
     final ArrowFlightConnectionConfigImpl config = new ArrowFlightConnectionConfigImpl(properties);
-    final ArrowFlightSqlClientHandler clientHandler = createNewClientHandler(config, allocator);
+    final ArrowFlightSqlClientHandler clientHandler =
+        createNewClientHandler(config, allocator, driver.getDriverVersion());
     return new ArrowFlightConnection(
         driver, factory, url, properties, config, allocator, clientHandler);
   }
 
   private static ArrowFlightSqlClientHandler createNewClientHandler(
-      final ArrowFlightConnectionConfigImpl config, final BufferAllocator allocator)
+      final ArrowFlightConnectionConfigImpl config,
+      final BufferAllocator allocator,
+      final DriverVersion driverVersion)
       throws SQLException {
     try {
       return new ArrowFlightSqlClientHandler.Builder()
@@ -116,6 +120,7 @@
           .withCatalog(config.getCatalog())
           .withClientCache(config.useClientCache() ? new FlightClientCache() : null)
           .withConnectTimeout(config.getConnectTimeout())
+          .withDriverVersion(driverVersion)
           .build();
     } catch (final SQLException e) {
       try {
diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
index 0dc2b07..622e5fe 100644
--- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
+++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java
@@ -19,6 +19,7 @@
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -28,14 +29,17 @@
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.calcite.avatica.AvaticaConnection;
 import org.apache.calcite.avatica.AvaticaResultSet;
 import org.apache.calcite.avatica.AvaticaResultSetMetaData;
+import org.apache.calcite.avatica.AvaticaSite;
 import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.Meta.Frame;
 import org.apache.calcite.avatica.Meta.Signature;
 import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.util.Cursor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,6 +106,33 @@
     execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
   }
 
+  /**
+   * The default method in AvaticaResultSet does not properly handle TIMESTASMP_WITH_TIMEZONE, so we
+   * override here to add support.
+   *
+   * @param columnIndex the first column is 1, the second is 2, ...
+   * @return Object
+   * @throws SQLException if there is an underlying exception
+   */
+  @Override
+  public Object getObject(int columnIndex) throws SQLException {
+    this.checkOpen();
+
+    Cursor.Accessor accessor;
+    try {
+      accessor = accessorList.get(columnIndex - 1);
+    } catch (IndexOutOfBoundsException e) {
+      throw AvaticaConnection.HELPER.createException("invalid column ordinal: " + columnIndex);
+    }
+
+    ColumnMetaData metaData = columnMetaDataList.get(columnIndex - 1);
+    if (metaData.type.id == Types.TIMESTAMP_WITH_TIMEZONE) {
+      return accessor.getTimestamp(localCalendar);
+    } else {
+      return AvaticaSite.get(accessor, metaData.type.id, localCalendar);
+    }
+  }
+
   @Override
   protected void cancel() {
     signature.columns.clear();
diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
index 9c7112f..21cc3e4 100644
--- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
+++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java
@@ -62,14 +62,17 @@
         parameterSchema == null
             ? new ArrayList<>()
             : ConvertUtils.convertArrowFieldsToAvaticaParameters(parameterSchema.getFields());
-
+    StatementType statementType =
+        resultSetSchema == null || resultSetSchema.getFields().isEmpty()
+            ? StatementType.IS_DML
+            : StatementType.SELECT;
     return new Signature(
         columnMetaData,
         sql,
         parameters,
         Collections.emptyMap(),
         null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
-        StatementType.SELECT);
+        statementType);
   }
 
   @Override
@@ -105,7 +108,8 @@
             preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator())
         .bind(typedValues);
 
-    if (statementHandle.signature == null) {
+    if (statementHandle.signature == null
+        || statementHandle.signature.statementType == StatementType.IS_DML) {
       // Update query
       long updatedCount = preparedStatement.executeUpdate();
       return new ExecuteResult(
diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
index 17c2c16..a3f6900 100644
--- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
+++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java
@@ -66,6 +66,7 @@
 import org.apache.arrow.util.VisibleForTesting;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.calcite.avatica.DriverVersion;
 import org.apache.calcite.avatica.Meta.StatementType;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -548,6 +549,9 @@
 
   /** Builder for {@link ArrowFlightSqlClientHandler}. */
   public static final class Builder {
+    static final String USER_AGENT_TEMPLATE = "JDBC Flight SQL Driver %s";
+    static final String DEFAULT_VERSION = "(unknown or development build)";
+
     private final Set<FlightClientMiddleware.Factory> middlewareFactories = new HashSet<>();
     private final Set<CallOption> options = new HashSet<>();
     private String host;
@@ -597,6 +601,8 @@
     @VisibleForTesting
     ClientCookieMiddleware.Factory cookieFactory = new ClientCookieMiddleware.Factory();
 
+    DriverVersion driverVersion;
+
     public Builder() {}
 
     /**
@@ -631,6 +637,8 @@
       if (original.retainAuth) {
         this.authFactory = original.authFactory;
       }
+
+      this.driverVersion = original.driverVersion;
     }
 
     /**
@@ -879,6 +887,17 @@
       return this;
     }
 
+    /**
+     * Sets the driver version for this handler.
+     *
+     * @param driverVersion the driver version to set
+     * @return this builder instance
+     */
+    public Builder withDriverVersion(DriverVersion driverVersion) {
+      this.driverVersion = driverVersion;
+      return this;
+    }
+
     public String getCacheKey() {
       return getLocation().toString();
     }
@@ -914,6 +933,11 @@
         final NettyClientBuilder clientBuilder = new NettyClientBuilder();
         clientBuilder.allocator(allocator);
 
+        String userAgent = String.format(USER_AGENT_TEMPLATE, DEFAULT_VERSION);
+        if (driverVersion != null && driverVersion.versionString != null) {
+          userAgent = String.format(USER_AGENT_TEMPLATE, driverVersion.versionString);
+        }
+
         buildTimeMiddlewareFactories.add(new ClientCookieMiddleware.Factory());
         buildTimeMiddlewareFactories.forEach(clientBuilder::intercept);
         if (useEncryption) {
@@ -948,6 +972,9 @@
         }
 
         NettyChannelBuilder channelBuilder = clientBuilder.build();
+
+        channelBuilder.userAgent(userAgent);
+
         if (connectTimeout != null) {
           channelBuilder.withOption(
               ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout.toMillis());
diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java
index 4c2a9b8..0fd99de 100644
--- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java
+++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/AvaticaParameterBinder.java
@@ -44,6 +44,7 @@
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.calcite.avatica.remote.TypedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Convert Avatica PreparedStatement parameters from a list of TypedValue to Arrow and bind them to
@@ -108,9 +109,9 @@
    * @param typedValue TypedValue to bind to the vector.
    * @param index Vector index to bind the value at.
    */
-  private void bind(FieldVector vector, TypedValue typedValue, int index) {
+  private void bind(FieldVector vector, @Nullable TypedValue typedValue, int index) {
     try {
-      if (typedValue.value == null) {
+      if (typedValue == null || typedValue.value == null) {
         if (vector.getField().isNullable()) {
           vector.setNull(index);
         } else {
@@ -127,7 +128,7 @@
       throw new UnsupportedOperationException(
           String.format(
               "Binding value of type %s is not yet supported for expected Arrow type %s",
-              typedValue.type, vector.getField().getType()));
+              typedValue == null ? "null" : typedValue.type, vector.getField().getType()));
     }
   }
 
diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcConnectionCookieTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcConnectionCookieTest.java
index 1977b61..7127c7f 100644
--- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcConnectionCookieTest.java
+++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcConnectionCookieTest.java
@@ -39,11 +39,11 @@
         Statement statement = connection.createStatement()) {
 
       // Expect client didn't receive cookies before any operation
-      assertNull(FLIGHT_SERVER_TEST_EXTENSION.getMiddlewareCookieFactory().getCookie());
+      assertNull(FLIGHT_SERVER_TEST_EXTENSION.getInterceptorFactory().getCookie());
 
       // Run another action for check if the cookies was sent by the server.
       statement.execute(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD);
-      assertEquals("k=v", FLIGHT_SERVER_TEST_EXTENSION.getMiddlewareCookieFactory().getCookie());
+      assertEquals("k=v", FLIGHT_SERVER_TEST_EXTENSION.getInterceptorFactory().getCookie());
     }
   }
 }
diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
index 774ad00..0369c3a 100644
--- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
+++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java
@@ -20,6 +20,8 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
@@ -84,6 +86,19 @@
   }
 
   @Test
+  public void testSimpleQueryNoParameterBindingWithExecute() throws SQLException {
+    final String query = CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD;
+    try (final PreparedStatement preparedStatement = connection.prepareStatement(query)) {
+      boolean isResultSet = preparedStatement.execute();
+      assertTrue(isResultSet);
+      final ResultSet resultSet = preparedStatement.getResultSet();
+      CoreMockedSqlProducers.assertLegacyRegularSqlResultSet(resultSet);
+      assertFalse(preparedStatement.getMoreResults());
+      assertEquals(-1, preparedStatement.getUpdateCount());
+    }
+  }
+
+  @Test
   public void testQueryWithParameterBinding() throws SQLException {
     final String query = "Fake query with parameters";
     final Schema schema =
@@ -175,6 +190,20 @@
   }
 
   @Test
+  public void testUpdateQueryWithExecute() throws SQLException {
+    String query = "Fake update with execute";
+    PRODUCER.addUpdateQuery(query, /*updatedRows*/ 42);
+    try (final PreparedStatement stmt = connection.prepareStatement(query)) {
+      boolean isResultSet = stmt.execute();
+      assertFalse(isResultSet);
+      int updated = stmt.getUpdateCount();
+      assertEquals(42, updated);
+      assertFalse(stmt.getMoreResults());
+      assertEquals(-1, stmt.getUpdateCount());
+    }
+  }
+
+  @Test
   public void testUpdateQueryWithParameters() throws SQLException {
     String query = "Fake update with parameters";
     PRODUCER.addUpdateQuery(query, /*updatedRows*/ 42);
diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
index 8e872a1..72e4b22 100644
--- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
+++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ConnectionTest.java
@@ -31,6 +31,7 @@
 import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
 import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
 import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
+import org.apache.arrow.flight.FlightMethod;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.AutoCloseables;
@@ -576,4 +577,49 @@
       assertTrue(connection.isValid(0));
     }
   }
+
+  /**
+   * Test that the JDBC driver properly integrates driver version into client handler.
+   *
+   * @throws Exception on error.
+   */
+  @Test
+  public void testJdbcDriverVersionIntegration() throws Exception {
+    final Properties properties = new Properties();
+    properties.put(
+        ArrowFlightConnectionProperty.HOST.camelName(), FLIGHT_SERVER_TEST_EXTENSION.getHost());
+    properties.put(
+        ArrowFlightConnectionProperty.PORT.camelName(), FLIGHT_SERVER_TEST_EXTENSION.getPort());
+    properties.put(ArrowFlightConnectionProperty.USER.camelName(), userTest);
+    properties.put(ArrowFlightConnectionProperty.PASSWORD.camelName(), passTest);
+    properties.put(ArrowFlightConnectionProperty.USE_ENCRYPTION.camelName(), false);
+
+    // Create a driver instance and connect
+    ArrowFlightJdbcDriver driverVersion = new ArrowFlightJdbcDriver();
+
+    try (Connection connection =
+        ArrowFlightConnection.createNewConnection(
+            driverVersion,
+            new ArrowFlightJdbcFactory(),
+            "jdbc:arrow-flight-sql://localhost:" + FLIGHT_SERVER_TEST_EXTENSION.getPort(),
+            properties,
+            allocator)) {
+
+      assertTrue(connection.isValid(0));
+
+      var actualUserAgent =
+          FLIGHT_SERVER_TEST_EXTENSION
+              .getInterceptorFactory()
+              .getHeader(FlightMethod.HANDSHAKE, "user-agent");
+
+      var expectedUserAgent =
+          "JDBC Flight SQL Driver " + driverVersion.getDriverVersion().versionString;
+      // Driver appends version to grpc user-agent header. Assert the header starts with the
+      // expected
+      // value and ignored grpc version.
+      assertTrue(
+          actualUserAgent.startsWith(expectedUserAgent),
+          "Expected: " + expectedUserAgent + " but found: " + actualUserAgent);
+    }
+  }
 }
diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/FlightServerTestExtension.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/FlightServerTestExtension.java
index aa58665..f71114e 100644
--- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/FlightServerTestExtension.java
+++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/FlightServerTestExtension.java
@@ -25,6 +25,8 @@
 import java.sql.SQLException;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.arrow.driver.jdbc.authentication.Authentication;
 import org.apache.arrow.driver.jdbc.authentication.TokenAuthentication;
@@ -33,6 +35,7 @@
 import org.apache.arrow.flight.CallHeaders;
 import org.apache.arrow.flight.CallInfo;
 import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightMethod;
 import org.apache.arrow.flight.FlightServer;
 import org.apache.arrow.flight.FlightServerMiddleware;
 import org.apache.arrow.flight.Location;
@@ -67,7 +70,8 @@
   private final CertKeyPair certKeyPair;
   private final File mTlsCACert;
 
-  private final MiddlewareCookie.Factory middlewareCookieFactory = new MiddlewareCookie.Factory();
+  private final InterceptorMiddleware.Factory interceptorFactory =
+      new InterceptorMiddleware.Factory();
 
   private FlightServerTestExtension(
       final Properties properties,
@@ -126,12 +130,18 @@
     return this.createDataSource().getConnection();
   }
 
+  public Connection getConnection(String timezone) throws SQLException {
+    setUseEncryption(false);
+    properties.put("timezone", timezone);
+    return this.createDataSource().getConnection();
+  }
+
   private void setUseEncryption(boolean useEncryption) {
     properties.put("useEncryption", useEncryption);
   }
 
-  public MiddlewareCookie.Factory getMiddlewareCookieFactory() {
-    return middlewareCookieFactory;
+  public InterceptorMiddleware.Factory getInterceptorFactory() {
+    return interceptorFactory;
   }
 
   @FunctionalInterface
@@ -143,7 +153,7 @@
     FlightServer.Builder builder =
         FlightServer.builder(allocator, location, producer)
             .headerAuthenticator(authentication.authenticate())
-            .middleware(FlightServerMiddleware.Key.of("KEY"), middlewareCookieFactory);
+            .middleware(FlightServerMiddleware.Key.of("KEY"), interceptorFactory);
     if (certKeyPair != null) {
       builder.useTls(certKeyPair.cert, certKeyPair.key);
     }
@@ -301,11 +311,11 @@
    * A middleware to handle with the cookies in the server. It is used to test if cookies are being
    * sent properly.
    */
-  static class MiddlewareCookie implements FlightServerMiddleware {
+  static class InterceptorMiddleware implements FlightServerMiddleware {
 
     private final Factory factory;
 
-    public MiddlewareCookie(Factory factory) {
+    public InterceptorMiddleware(Factory factory) {
       this.factory = factory;
     }
 
@@ -323,22 +333,33 @@
     public void onCallErrored(Throwable throwable) {}
 
     /** A factory for the MiddlewareCookie. */
-    static class Factory implements FlightServerMiddleware.Factory<MiddlewareCookie> {
+    static class Factory implements FlightServerMiddleware.Factory<InterceptorMiddleware> {
 
+      private final Map<FlightMethod, CallHeaders> receivedCallHeaders = new HashMap<>();
       private boolean receivedCookieHeader = false;
       private String cookie;
 
       @Override
-      public MiddlewareCookie onCallStarted(
+      public InterceptorMiddleware onCallStarted(
           CallInfo callInfo, CallHeaders callHeaders, RequestContext requestContext) {
         cookie = callHeaders.get("Cookie");
         receivedCookieHeader = null != cookie;
-        return new MiddlewareCookie(this);
+
+        receivedCallHeaders.put(callInfo.method(), callHeaders);
+        return new InterceptorMiddleware(this);
       }
 
       public String getCookie() {
         return cookie;
       }
+
+      public String getHeader(FlightMethod method, String key) {
+        CallHeaders headers = receivedCallHeaders.get(method);
+        if (headers == null) {
+          return null;
+        }
+        return headers.get(key);
+      }
     }
   }
 }
diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/TimestampResultSetTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/TimestampResultSetTest.java
new file mode 100644
index 0000000..0921ae2
--- /dev/null
+++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/TimestampResultSetTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.driver.jdbc;
+
+import com.google.common.collect.ImmutableList;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.TimeZone;
+import org.apache.arrow.driver.jdbc.utils.MockFlightSqlProducer;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * Timestamps have a lot of nuances in JDBC. This class is here to test that timestamp behavior is
+ * correct for different types of Timestamp vectors as well as different methods of retrieving the
+ * timestamps in JDBC.
+ */
+public class TimestampResultSetTest {
+  private static final MockFlightSqlProducer FLIGHT_SQL_PRODUCER = new MockFlightSqlProducer();
+
+  @RegisterExtension public static FlightServerTestExtension FLIGHT_SERVER_TEST_EXTENSION;
+
+  static {
+    FLIGHT_SERVER_TEST_EXTENSION =
+        FlightServerTestExtension.createStandardTestExtension(FLIGHT_SQL_PRODUCER);
+  }
+
+  private static final String QUERY_STRING = "SELECT * FROM TIMESTAMPS";
+  private static final Schema QUERY_SCHEMA =
+      new Schema(
+          ImmutableList.of(
+              Field.nullable("no_tz", new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
+              Field.nullable("utc", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
+              Field.nullable("utc+1", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "GMT+1")),
+              Field.nullable("utc-1", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "GMT-1"))));
+
+  @BeforeAll
+  public static void setup() throws SQLException {
+    Instant firstDay2025 = OffsetDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant();
+
+    FLIGHT_SQL_PRODUCER.addSelectQuery(
+        QUERY_STRING,
+        QUERY_SCHEMA,
+        Collections.singletonList(
+            listener -> {
+              try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+                  final VectorSchemaRoot root = VectorSchemaRoot.create(QUERY_SCHEMA, allocator)) {
+                listener.start(root);
+                root.getFieldVectors()
+                    .forEach(v -> ((TimeStampVector) v).setSafe(0, firstDay2025.toEpochMilli()));
+                root.setRowCount(1);
+                listener.putNext();
+              } catch (final Throwable throwable) {
+                listener.error(throwable);
+              } finally {
+                listener.completed();
+              }
+            }));
+  }
+
+  /**
+   * This test doesn't yet test anything other than ensuring all ResultSet methods to retrieve a
+   * timestamp succeed.
+   *
+   * <p>This is a good starting point to add more tests to ensure the values are correct when we
+   * change the "local calendar" either through changing the JVM default or through the connection
+   * property.
+   */
+  @Test
+  public void test() {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+    try (Connection connection = FLIGHT_SERVER_TEST_EXTENSION.getConnection("UTC")) {
+      try (PreparedStatement s = connection.prepareStatement(QUERY_STRING)) {
+        try (ResultSet rs = s.executeQuery()) {
+          int numCols = rs.getMetaData().getColumnCount();
+          try {
+            rs.next();
+            for (int i = 1; i <= numCols; i++) {
+              int type = rs.getMetaData().getColumnType(i);
+              String name = rs.getMetaData().getColumnName(i);
+              System.out.println(name);
+              System.out.print("- getDate:\t\t\t\t\t\t\t");
+              System.out.print(rs.getDate(i));
+              System.out.println();
+              System.out.print("- getTimestamp:\t\t\t\t\t\t");
+              System.out.print(rs.getTimestamp(i));
+              System.out.println();
+              System.out.print("- getString:\t\t\t\t\t\t");
+              System.out.print(rs.getString(i));
+              System.out.println();
+              System.out.print("- getObject:\t\t\t\t\t\t");
+              System.out.print(rs.getObject(i));
+              System.out.println();
+              System.out.print("- getObject(Timestamp.class):\t\t");
+              System.out.print(rs.getObject(i, Timestamp.class));
+              System.out.println();
+              System.out.print("- getTimestamp(default Calendar):\t");
+              System.out.print(rs.getTimestamp(i, Calendar.getInstance()));
+              System.out.println();
+              System.out.print("- getTimestamp(UTC Calendar):\t\t");
+              System.out.print(
+                  rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone("UTC"))));
+              System.out.println();
+              System.out.print("- getObject(LocalDateTime.class):\t");
+              System.out.print(rs.getObject(i, LocalDateTime.class));
+              System.out.println();
+              if (type == Types.TIMESTAMP_WITH_TIMEZONE) {
+                System.out.print("- getObject(Instant.class):\t\t\t");
+                System.out.print(rs.getObject(i, Instant.class));
+                System.out.println();
+                System.out.print("- getObject(OffsetDateTime.class):\t");
+                System.out.print(rs.getObject(i, OffsetDateTime.class));
+                System.out.println();
+                System.out.print("- getObject(ZonedDateTime.class):\t");
+                System.out.print(rs.getObject(i, ZonedDateTime.class));
+                System.out.println();
+              }
+              System.out.println();
+            }
+            System.out.println();
+          } catch (SQLException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
index 6524eaf..a60a71f 100644
--- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
+++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandlerBuilderTest.java
@@ -149,6 +149,7 @@
     assertEquals(Optional.empty(), builder.catalog);
     assertNull(builder.flightClientCache);
     assertNull(builder.connectTimeout);
+    assertNull(builder.driverVersion);
   }
 
   @Test
diff --git a/pom.xml b/pom.xml
index 9e83644..36d9d59 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache</groupId>
     <artifactId>apache</artifactId>
-    <version>34</version>
+    <version>35</version>
   </parent>
 
   <groupId>org.apache.arrow</groupId>
@@ -91,12 +91,13 @@
   </issueManagement>
 
   <properties>
+    <project.build.outputTimestamp>1695310533</project.build.outputTimestamp>
     <target.gen.source.path>${project.build.directory}/generated-sources</target.gen.source.path>
     <dep.junit.platform.version>1.9.0</dep.junit.platform.version>
     <dep.junit.jupiter.version>5.12.2</dep.junit.jupiter.version>
     <dep.slf4j.version>2.0.17</dep.slf4j.version>
     <dep.guava-bom.version>33.4.8-jre</dep.guava-bom.version>
-    <dep.netty-bom.version>4.1.119.Final</dep.netty-bom.version>
+    <dep.netty-bom.version>4.1.127.Final</dep.netty-bom.version>
     <dep.grpc-bom.version>1.73.0</dep.grpc-bom.version>
     <dep.protobuf-bom.version>4.30.2</dep.protobuf-bom.version>
     <dep.jackson-bom.version>2.18.3</dep.jackson-bom.version>
@@ -110,7 +111,7 @@
     <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
     <error_prone_core.version>2.37.0</error_prone_core.version>
     <checker.framework.version>3.49.5</checker.framework.version>
-    <logback.version>1.5.18</logback.version>
+    <logback.version>1.5.21</logback.version>
     <doclint>none</doclint>
     <additionalparam>-Xdoclint:none</additionalparam>
     <!-- List of add-opens arg line arguments for tests -->
@@ -123,6 +124,8 @@
     <!--
       Downgrade maven-jar-plugin until https://github.com/codehaus-plexus/plexus-archiver/issues/332
       is addressed
+      maven-jar-plugin 4.0.0-beta-2-SNAPSHOT upgraded to plexus-archive 4.10.2 fixing the issue.
+      We have to wait new maven-jar-plugin release, and a new Apache POM release providing it
     -->
     <version.maven-jar-plugin>3.2.2</version.maven-jar-plugin>
   </properties>
diff --git a/vector/src/main/codegen/includes/vv_imports.ftl b/vector/src/main/codegen/includes/vv_imports.ftl
index 7f216a7..2bbcecc 100644
--- a/vector/src/main/codegen/includes/vv_imports.ftl
+++ b/vector/src/main/codegen/includes/vv_imports.ftl
@@ -34,6 +34,7 @@
 import org.apache.arrow.vector.complex.reader.*;
 import org.apache.arrow.vector.complex.impl.*;
 import org.apache.arrow.vector.complex.writer.*;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
diff --git a/vector/src/main/codegen/templates/AbstractFieldReader.java b/vector/src/main/codegen/templates/AbstractFieldReader.java
index 25b071f..c7c5b4d 100644
--- a/vector/src/main/codegen/templates/AbstractFieldReader.java
+++ b/vector/src/main/codegen/templates/AbstractFieldReader.java
@@ -108,6 +108,27 @@
   }
 
   </#list></#list>
+
+  public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory) {
+    fail("CopyAsValue StructWriter");
+  }
+
+  public void read(ExtensionHolder holder) {
+    fail("Extension");
+  }
+
+  public void read(int arrayIndex, ExtensionHolder holder) {
+    fail("RepeatedExtension");
+  }
+
+  public void copyAsValue(AbstractExtensionTypeWriter writer) {
+    fail("CopyAsValueExtension");
+  }
+
+  public void copyAsField(String name, AbstractExtensionTypeWriter writer) {
+    fail("CopyAsFieldExtension");
+  }
+
   public FieldReader reader(String name) {
     fail("reader(String name)");
     return null;
diff --git a/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java b/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java
index 951edd5..2e7792f 100644
--- a/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java
+++ b/vector/src/main/codegen/templates/AbstractPromotableFieldWriter.java
@@ -295,7 +295,7 @@
 
   @Override
   public ExtensionWriter extension(ArrowType arrowType) {
-    return getWriter(MinorType.EXTENSIONTYPE).extension(arrowType);
+    return getWriter(MinorType.LIST).extension(arrowType);
   }
 
   @Override
@@ -325,7 +325,7 @@
 
   @Override
   public ExtensionWriter extension(String name, ArrowType arrowType) {
-    return getWriter(MinorType.EXTENSIONTYPE).extension(name, arrowType);
+    return getWriter(MinorType.STRUCT).extension(name, arrowType);
   }
 
   <#list vv.types as type><#list type.minor as minor>
diff --git a/vector/src/main/codegen/templates/BaseReader.java b/vector/src/main/codegen/templates/BaseReader.java
index e75e8a2..4c6f49a 100644
--- a/vector/src/main/codegen/templates/BaseReader.java
+++ b/vector/src/main/codegen/templates/BaseReader.java
@@ -49,6 +49,7 @@
     boolean next();
     int size();
     void copyAsValue(StructWriter writer);
+    void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory);
   }
 
   public interface ListReader extends BaseReader{
@@ -59,6 +60,7 @@
     boolean next();
     int size();
     void copyAsValue(ListWriter writer);
+    void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory);
   }
 
   public interface MapReader extends BaseReader{
@@ -69,11 +71,12 @@
     boolean next();
     int size();
     void copyAsValue(MapWriter writer);
+    void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory);
   }
 
   public interface ScalarReader extends
   <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> ${name}Reader, </#list></#list>
-  BaseReader {}
+  ExtensionReader, BaseReader {}
 
   interface ComplexReader{
     StructReader rootAsStruct();
diff --git a/vector/src/main/codegen/templates/ComplexCopier.java b/vector/src/main/codegen/templates/ComplexCopier.java
index 4fff705..4df5478 100644
--- a/vector/src/main/codegen/templates/ComplexCopier.java
+++ b/vector/src/main/codegen/templates/ComplexCopier.java
@@ -42,10 +42,14 @@
    * @param output field to write to
    */
   public static void copy(FieldReader input, FieldWriter output) {
-    writeValue(input, output);
+    writeValue(input, output, null);
   }
 
-  private static void writeValue(FieldReader reader, FieldWriter writer) {
+  public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
+    writeValue(input, output, extensionTypeWriterFactory);
+  }
+
+  private static void writeValue(FieldReader reader, FieldWriter writer, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
     final MinorType mt = reader.getMinorType();
 
       switch (mt) {
@@ -61,7 +65,7 @@
             FieldReader childReader = reader.reader();
             FieldWriter childWriter = getListWriterForReader(childReader, writer);
             if (childReader.isSet()) {
-              writeValue(childReader, childWriter);
+              writeValue(childReader, childWriter, extensionTypeWriterFactory);
             } else {
               childWriter.writeNull();
             }
@@ -79,8 +83,8 @@
             FieldReader structReader = reader.reader();
             if (structReader.isSet()) {
               writer.startEntry();
-              writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()));
-              writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()));
+              writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()), extensionTypeWriterFactory);
+              writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()), extensionTypeWriterFactory);
               writer.endEntry();
             } else {
               writer.writeNull();
@@ -99,7 +103,7 @@
             if (childReader.getMinorType() != Types.MinorType.NULL) {
               FieldWriter childWriter = getStructWriterForReader(childReader, writer, name);
               if (childReader.isSet()) {
-                writeValue(childReader, childWriter);
+                writeValue(childReader, childWriter, extensionTypeWriterFactory);
               } else {
                 childWriter.writeNull();
               }
@@ -110,6 +114,20 @@
           writer.writeNull();
         }
         break;
+      case EXTENSIONTYPE:
+        if (extensionTypeWriterFactory == null) {
+          throw new IllegalArgumentException("Must provide ExtensionTypeWriterFactory");
+        }
+        if (reader.isSet()) {
+          Object value = reader.readObject();
+          if (value != null) {
+            writer.addExtensionTypeWriterFactory(extensionTypeWriterFactory);
+            writer.writeExtension(value);
+          }
+        } else {
+          writer.writeNull();
+        }
+        break;
   <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
   <#assign fields = minor.fields!type.fields />
   <#assign uncappedName = name?uncap_first/>
@@ -162,6 +180,9 @@
       return (FieldWriter) writer.map(name);
     case LISTVIEW:
       return (FieldWriter) writer.listView(name);
+    case EXTENSIONTYPE:
+      ExtensionWriter extensionWriter = writer.extension(name, reader.getField().getType());
+      return (FieldWriter) extensionWriter;
     default:
       throw new UnsupportedOperationException(reader.getMinorType().toString());
     }
@@ -186,6 +207,9 @@
       return (FieldWriter) writer.list();
     case LISTVIEW:
       return (FieldWriter) writer.listView();
+    case EXTENSIONTYPE:
+      ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
+      return (FieldWriter) extensionWriter;
     default:
       throw new UnsupportedOperationException(reader.getMinorType().toString());
     }
@@ -211,6 +235,9 @@
         return (FieldWriter) writer.listView();
       case MAP:
         return (FieldWriter) writer.map(false);
+      case EXTENSIONTYPE:
+        ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
+        return (FieldWriter) extensionWriter;
       default:
         throw new UnsupportedOperationException(reader.getMinorType().toString());
     }
diff --git a/vector/src/main/codegen/templates/NullReader.java b/vector/src/main/codegen/templates/NullReader.java
index 1d77248..0529633 100644
--- a/vector/src/main/codegen/templates/NullReader.java
+++ b/vector/src/main/codegen/templates/NullReader.java
@@ -86,6 +86,11 @@
   }
   </#list></#list>
 
+  public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory){}
+  public void read(ExtensionHolder holder) {
+    holder.isSet = 0;
+  }
+
   public int size(){
     return 0;
   }
diff --git a/vector/src/main/codegen/templates/PromotableWriter.java b/vector/src/main/codegen/templates/PromotableWriter.java
index 8d7d57b..d22eb00 100644
--- a/vector/src/main/codegen/templates/PromotableWriter.java
+++ b/vector/src/main/codegen/templates/PromotableWriter.java
@@ -550,6 +550,10 @@
     getWriter(MinorType.EXTENSIONTYPE).addExtensionTypeWriterFactory(factory);
   }
 
+  public void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory factory, ArrowType arrowType) {
+    getWriter(MinorType.EXTENSIONTYPE, arrowType).addExtensionTypeWriterFactory(factory);
+  }
+
   @Override
   public void allocate() {
     getWriter().allocate();
diff --git a/vector/src/main/codegen/templates/UnionListWriter.java b/vector/src/main/codegen/templates/UnionListWriter.java
index 9424533..3c41ac7 100644
--- a/vector/src/main/codegen/templates/UnionListWriter.java
+++ b/vector/src/main/codegen/templates/UnionListWriter.java
@@ -53,6 +53,7 @@
   private boolean inStruct = false;
   private boolean listStarted = false;
   private String structName;
+  private ArrowType extensionType;
   <#if listName == "LargeList" || listName == "LargeListView">
   private static final long OFFSET_WIDTH = 8;
   <#else>
@@ -203,8 +204,8 @@
 
   @Override
   public ExtensionWriter extension(ArrowType arrowType) {
-    writer.extension(arrowType);
-    return writer;
+    this.extensionType = arrowType;
+    return this;
   }
   @Override
   public ExtensionWriter extension(String name, ArrowType arrowType) {
@@ -337,13 +338,17 @@
   @Override
   public void writeExtension(Object value) {
     writer.writeExtension(value);
+    writer.setPosition(writer.idx() + 1);
   }
+
   @Override
   public void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory var1) {
-    writer.addExtensionTypeWriterFactory(var1);
+    writer.addExtensionTypeWriterFactory(var1, extensionType);
   }
+
   public void write(ExtensionHolder var1) {
     writer.write(var1);
+    writer.setPosition(writer.idx() + 1);
   }
 
   <#list vv.types as type>
diff --git a/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java b/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java
index 37dfa20..cc57cde 100644
--- a/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java
@@ -22,6 +22,7 @@
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.ReferenceManager;
 import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.util.DataSizeRoundingUtil;
 import org.apache.arrow.vector.util.TransferPair;
@@ -260,6 +261,18 @@
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void copyFrom(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void copyFromSafe(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
    * Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte
diff --git a/vector/src/main/java/org/apache/arrow/vector/NullVector.java b/vector/src/main/java/org/apache/arrow/vector/NullVector.java
index 6bfe540..0d6dab2 100644
--- a/vector/src/main/java/org/apache/arrow/vector/NullVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/NullVector.java
@@ -27,6 +27,7 @@
 import org.apache.arrow.memory.util.hash.ArrowBufHasher;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.compare.VectorVisitor;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.impl.NullReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -330,6 +331,18 @@
   }
 
   @Override
+  public void copyFrom(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void copyFromSafe(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public String getName() {
     return this.getField().getName();
   }
diff --git a/vector/src/main/java/org/apache/arrow/vector/ValueVector.java b/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
index 3a50582..e0628c2 100644
--- a/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
@@ -22,6 +22,7 @@
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.memory.util.hash.ArrowBufHasher;
 import org.apache.arrow.vector.compare.VectorVisitor;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
@@ -310,6 +311,30 @@
   void copyFromSafe(int fromIndex, int thisIndex, ValueVector from);
 
   /**
+   * Copy a cell value from a particular index in source vector to a particular position in this
+   * vector.
+   *
+   * @param fromIndex position to copy from in source vector
+   * @param thisIndex position to copy to in this vector
+   * @param from source vector
+   * @param writerFactory the extension type writer factory to use for copying extension type values
+   */
+  void copyFrom(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
+
+  /**
+   * Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
+   * capacity of the vector needs to be expanded before copy.
+   *
+   * @param fromIndex position to copy from in source vector
+   * @param thisIndex position to copy to in this vector
+   * @param from source vector
+   * @param writerFactory the extension type writer factory to use for copying extension type values
+   */
+  void copyFromSafe(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
+
+  /**
    * Accept a generic {@link VectorVisitor} and return the result.
    *
    * @param <OUT> the output result type.
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index a6a71cf..429f988 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -21,6 +21,7 @@
 import org.apache.arrow.vector.DensityAwareVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList;
@@ -152,6 +153,18 @@
   }
 
   @Override
+  public void copyFrom(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void copyFromSafe(
+      int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public String getName() {
     return name;
   }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java b/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java
index 2921e43..a57fbe4 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java
@@ -46,11 +46,13 @@
   private ConflictPolicy conflictPolicy;
 
   static {
-    String conflictPolicyStr =
-        System.getProperty(STRUCT_CONFLICT_POLICY_JVM, ConflictPolicy.CONFLICT_REPLACE.toString());
+    String conflictPolicyStr = System.getProperty(STRUCT_CONFLICT_POLICY_JVM);
     if (conflictPolicyStr == null) {
       conflictPolicyStr = System.getenv(STRUCT_CONFLICT_POLICY_ENV);
     }
+    if (conflictPolicyStr == null) {
+      conflictPolicyStr = ConflictPolicy.CONFLICT_REPLACE.toString();
+    }
     ConflictPolicy conflictPolicy;
     try {
       conflictPolicy = ConflictPolicy.valueOf(conflictPolicyStr.toUpperCase(Locale.ROOT));
@@ -62,11 +64,11 @@
 
   /** Policy to determine how to react when duplicate columns are encountered. */
   public enum ConflictPolicy {
-    // Ignore the conflict and append the field. This is the default behaviour
+    // Ignore the conflict and append the field.
     CONFLICT_APPEND,
     // Keep the existing field and ignore the newer one.
     CONFLICT_IGNORE,
-    // Replace the existing field with the newer one.
+    // Replace the existing field with the newer one. This is the default behaviour
     CONFLICT_REPLACE,
     // Refuse the new field and error out.
     CONFLICT_ERROR
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java b/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java
index 835d346..48c8127 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java
@@ -31,6 +31,7 @@
 import org.apache.arrow.memory.util.ArrowBufPointer;
 import org.apache.arrow.memory.util.ByteFunctionHelpers;
 import org.apache.arrow.memory.util.CommonUtil;
+import org.apache.arrow.memory.util.LargeMemoryUtil;
 import org.apache.arrow.memory.util.hash.ArrowBufHasher;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.AddOrGetResult;
@@ -48,6 +49,7 @@
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.compare.VectorVisitor;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.impl.UnionLargeListReader;
 import org.apache.arrow.vector.complex.impl.UnionLargeListWriter;
 import org.apache.arrow.vector.complex.reader.FieldReader;
@@ -481,12 +483,42 @@
    */
   @Override
   public void copyFrom(int inIndex, int outIndex, ValueVector from) {
+    copyFrom(inIndex, outIndex, from, null);
+  }
+
+  /**
+   * Copy a cell value from a particular index in source vector to a particular position in this
+   * vector.
+   *
+   * @param inIndex position to copy from in source vector
+   * @param outIndex position to copy to in this vector
+   * @param from source vector
+   * @param writerFactory the extension type writer factory to use for copying extension type values
+   */
+  @Override
+  public void copyFrom(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
     Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
     FieldReader in = from.getReader();
     in.setPosition(inIndex);
     UnionLargeListWriter out = getWriter();
     out.setPosition(outIndex);
-    ComplexCopier.copy(in, out);
+    ComplexCopier.copy(in, out, writerFactory);
+  }
+
+  /**
+   * Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
+   * capacity of the vector needs to be expanded before copy.
+   *
+   * @param inIndex position to copy from in source vector
+   * @param outIndex position to copy to in this vector
+   * @param from source vector
+   * @param writerFactory the extension type writer factory to use for copying extension type values
+   */
+  @Override
+  public void copyFromSafe(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    copyFrom(inIndex, outIndex, from, writerFactory);
   }
 
   /**
@@ -861,10 +893,11 @@
     if (isSet(index) == 0) {
       return null;
     }
-    final List<Object> vals = new JsonStringArrayList<>();
     final long start = offsetBuffer.getLong((long) index * OFFSET_WIDTH);
     final long end = offsetBuffer.getLong(((long) index + 1L) * OFFSET_WIDTH);
     final ValueVector vv = getDataVector();
+    final List<Object> vals =
+        new JsonStringArrayList<>(LargeMemoryUtil.checkedCastToInt(end - start));
     for (long i = start; i < end; i++) {
       vals.add(vv.getObject(checkedCastToInt(i)));
     }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/LargeListViewVector.java b/vector/src/main/java/org/apache/arrow/vector/complex/LargeListViewVector.java
index 394c3c6..992a664 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/LargeListViewVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/LargeListViewVector.java
@@ -41,6 +41,7 @@
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.compare.VectorVisitor;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.impl.UnionLargeListViewReader;
 import org.apache.arrow.vector.complex.impl.UnionLargeListViewWriter;
 import org.apache.arrow.vector.complex.impl.UnionListReader;
@@ -347,6 +348,20 @@
   }
 
   @Override
+  public void copyFromSafe(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException(
+        "LargeListViewVector does not support copyFromSafe operation yet.");
+  }
+
+  @Override
+  public void copyFrom(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    throw new UnsupportedOperationException(
+        "LargeListViewVector does not support copyFrom operation yet.");
+  }
+
+  @Override
   public FieldVector getDataVector() {
     return vector;
   }
@@ -672,10 +687,10 @@
     if (isSet(index) == 0) {
       return null;
     }
-    final List<Object> vals = new JsonStringArrayList<>();
     final int start = offsetBuffer.getInt(index * OFFSET_WIDTH);
     final int end = start + sizeBuffer.getInt((index) * SIZE_WIDTH);
     final ValueVector vv = getDataVector();
+    final List<Object> vals = new JsonStringArrayList<>(end - start);
     for (int i = start; i < end; i++) {
       vals.add(vv.getObject(checkedCastToInt(i)));
     }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index 2b28175..8954925 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -42,6 +42,7 @@
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.compare.VectorVisitor;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.impl.UnionListReader;
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.complex.reader.FieldReader;
@@ -400,12 +401,42 @@
    */
   @Override
   public void copyFrom(int inIndex, int outIndex, ValueVector from) {
+    copyFrom(inIndex, outIndex, from, null);
+  }
+
+  /**
+   * Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
+   * capacity of the vector needs to be expanded before copy.
+   *
+   * @param inIndex position to copy from in source vector
+   * @param outIndex position to copy to in this vector
+   * @param from source vector
+   * @param writerFactory the extension type writer factory to use for copying extension type values
+   */
+  @Override
+  public void copyFromSafe(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    copyFrom(inIndex, outIndex, from, writerFactory);
+  }
+
+  /**
+   * Copy a cell value from a particular index in source vector to a particular position in this
+   * vector.
+   *
+   * @param inIndex position to copy from in source vector
+   * @param outIndex position to copy to in this vector
+   * @param from source vector
+   * @param writerFactory the extension type writer factory to use for copying extension type values
+   */
+  @Override
+  public void copyFrom(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
     Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
     FieldReader in = from.getReader();
     in.setPosition(inIndex);
     FieldWriter out = getWriter();
     out.setPosition(outIndex);
-    ComplexCopier.copy(in, out);
+    ComplexCopier.copy(in, out, writerFactory);
   }
 
   /**
@@ -719,10 +750,10 @@
     if (isSet(index) == 0) {
       return null;
     }
-    final List<Object> vals = new JsonStringArrayList<>();
     final int start = offsetBuffer.getInt(index * OFFSET_WIDTH);
     final int end = offsetBuffer.getInt((index + 1) * OFFSET_WIDTH);
     final ValueVector vv = getDataVector();
+    final List<Object> vals = new JsonStringArrayList<>(end - start);
     for (int i = start; i < end; i++) {
       vals.add(vv.getObject(i));
     }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/ListViewVector.java b/vector/src/main/java/org/apache/arrow/vector/complex/ListViewVector.java
index 2b80101..2784240 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/ListViewVector.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/ListViewVector.java
@@ -42,6 +42,7 @@
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.compare.VectorVisitor;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
+import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
 import org.apache.arrow.vector.complex.impl.UnionListViewReader;
 import org.apache.arrow.vector.complex.impl.UnionListViewWriter;
 import org.apache.arrow.vector.complex.reader.FieldReader;
@@ -339,18 +340,30 @@
   }
 
   @Override
+  public void copyFromSafe(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
+    copyFrom(inIndex, outIndex, from, writerFactory);
+  }
+
+  @Override
   public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
     return visitor.visit(this, value);
   }
 
   @Override
   public void copyFrom(int inIndex, int outIndex, ValueVector from) {
+    copyFrom(inIndex, outIndex, from, null);
+  }
+
+  @Override
+  public void copyFrom(
+      int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
     Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
     FieldReader in = from.getReader();
     in.setPosition(inIndex);
     FieldWriter out = getWriter();
     out.setPosition(outIndex);
-    ComplexCopier.copy(in, out);
+    ComplexCopier.copy(in, out, writerFactory);
   }
 
   @Override
@@ -678,10 +691,10 @@
     if (isSet(index) == 0) {
       return null;
     }
-    final List<Object> vals = new JsonStringArrayList<>();
     final int start = offsetBuffer.getInt(index * OFFSET_WIDTH);
     final int end = start + sizeBuffer.getInt((index) * SIZE_WIDTH);
     final ValueVector vv = getDataVector();
+    final List<Object> vals = new JsonStringArrayList<>(end - start);
     for (int i = start; i < end; i++) {
       vals.add(vv.getObject(i));
     }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java b/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java
index b2e9566..bf074ec 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java
@@ -115,4 +115,14 @@
   public void copyAsValue(MapWriter writer) {
     ComplexCopier.copy(this, (FieldWriter) writer);
   }
+
+  @Override
+  public void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory) {
+    ComplexCopier.copy(this, (FieldWriter) writer, writerFactory);
+  }
+
+  @Override
+  public void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory) {
+    ComplexCopier.copy(this, (FieldWriter) writer, writerFactory);
+  }
 }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionExtensionWriter.java b/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionExtensionWriter.java
index d341384..4219069 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionExtensionWriter.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionExtensionWriter.java
@@ -76,4 +76,9 @@
       this.writer.setPosition(index);
     }
   }
+
+  @Override
+  public void writeNull() {
+    this.writer.writeNull();
+  }
 }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionLargeListReader.java b/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionLargeListReader.java
index be236c3..a9104cb 100644
--- a/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionLargeListReader.java
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionLargeListReader.java
@@ -105,4 +105,8 @@
   public void copyAsValue(UnionLargeListWriter writer) {
     ComplexCopier.copy(this, (FieldWriter) writer);
   }
+
+  public void copyAsValue(UnionLargeListWriter writer, ExtensionTypeWriterFactory writerFactory) {
+    ComplexCopier.copy(this, (FieldWriter) writer, writerFactory);
+  }
 }
diff --git a/vector/src/main/java/org/apache/arrow/vector/complex/reader/ExtensionReader.java b/vector/src/main/java/org/apache/arrow/vector/complex/reader/ExtensionReader.java
new file mode 100644
index 0000000..1ba7b27
--- /dev/null
+++ b/vector/src/main/java/org/apache/arrow/vector/complex/reader/ExtensionReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.reader;
+
+import org.apache.arrow.vector.holders.ExtensionHolder;
+
+/** Interface for reading extension types. Extends the functionality of {@link BaseReader}. */
+public interface ExtensionReader extends BaseReader {
+
+  /**
+   * Reads to the given extension holder.
+   *
+   * @param holder the {@link ExtensionHolder} to read
+   */
+  void read(ExtensionHolder holder);
+
+  /**
+   * Reads and returns an object representation of the extension type.
+   *
+   * @return the object representation of the extension type
+   */
+  Object readObject();
+
+  /**
+   * Checks if the current value is set.
+   *
+   * @return true if the value is set, false otherwise
+   */
+  boolean isSet();
+}
diff --git a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java
index 0dc96a4..e7c0d11 100644
--- a/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java
+++ b/vector/src/main/java/org/apache/arrow/vector/util/VectorAppender.java
@@ -24,13 +24,17 @@
 import org.apache.arrow.memory.util.MemoryUtil;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.BaseIntVector;
 import org.apache.arrow.vector.BaseLargeVariableWidthVector;
 import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.BaseVariableWidthViewVector;
+import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.BitVectorHelper;
 import org.apache.arrow.vector.ExtensionTypeVector;
+import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.compare.TypeEqualsVisitor;
 import org.apache.arrow.vector.compare.VectorVisitor;
@@ -39,6 +43,7 @@
 import org.apache.arrow.vector.complex.LargeListVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.NonNullableStructVector;
+import org.apache.arrow.vector.complex.RunEndEncodedVector;
 import org.apache.arrow.vector.complex.UnionVector;
 
 /** Utility to append two vectors together. */
@@ -698,4 +703,98 @@
     deltaVector.getUnderlyingVector().accept(underlyingAppender, null);
     return targetVector;
   }
+
+  @Override
+  public ValueVector visit(RunEndEncodedVector deltaVector, Void value) {
+    Preconditions.checkArgument(
+        typeVisitor.equals(deltaVector),
+        "The deltaVector to append must have the same type as the targetVector");
+
+    if (deltaVector.getValueCount() == 0) {
+      return targetVector; // optimization, nothing to append, return
+    }
+
+    RunEndEncodedVector targetEncodedVector = (RunEndEncodedVector) targetVector;
+
+    final int targetLogicalValueCount = targetEncodedVector.getValueCount();
+
+    // Append the values vector first.
+    VectorAppender valueAppender = new VectorAppender(targetEncodedVector.getValuesVector());
+    deltaVector.getValuesVector().accept(valueAppender, null);
+
+    // Then append the run-ends vector.
+    BaseIntVector targetRunEndsVector = (BaseIntVector) targetEncodedVector.getRunEndsVector();
+    BaseIntVector deltaRunEndsVector = (BaseIntVector) deltaVector.getRunEndsVector();
+    appendRunEndsVector(targetRunEndsVector, deltaRunEndsVector, targetLogicalValueCount);
+
+    targetEncodedVector.setValueCount(targetLogicalValueCount + deltaVector.getValueCount());
+    return targetVector;
+  }
+
+  private void appendRunEndsVector(
+      BaseIntVector targetRunEndsVector,
+      BaseIntVector deltaRunEndsVector,
+      int targetLogicalValueCount) {
+    int targetPhysicalValueCount = targetRunEndsVector.getValueCount();
+    int newPhysicalValueCount = targetPhysicalValueCount + deltaRunEndsVector.getValueCount();
+
+    // make sure there is enough capacity
+    while (targetVector.getValueCapacity() < newPhysicalValueCount) {
+      targetVector.reAlloc();
+    }
+
+    // append validity buffer
+    BitVectorHelper.concatBits(
+        targetRunEndsVector.getValidityBuffer(),
+        targetRunEndsVector.getValueCount(),
+        deltaRunEndsVector.getValidityBuffer(),
+        deltaRunEndsVector.getValueCount(),
+        targetRunEndsVector.getValidityBuffer());
+
+    // shift and append data buffer
+    shiftAndAppendRunEndsDataBuffer(
+        targetRunEndsVector,
+        targetPhysicalValueCount,
+        deltaRunEndsVector.getDataBuffer(),
+        targetLogicalValueCount,
+        deltaRunEndsVector.getValueCount());
+
+    targetRunEndsVector.setValueCount(newPhysicalValueCount);
+  }
+
+  private void shiftAndAppendRunEndsDataBuffer(
+      BaseIntVector toRunEndVector,
+      int toIndex,
+      ArrowBuf fromRunEndBuffer,
+      int offset,
+      int physicalLength) {
+    ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer();
+    if (toRunEndVector instanceof SmallIntVector) {
+      byte typeWidth = SmallIntVector.TYPE_WIDTH;
+      for (int i = 0; i < physicalLength; i++) {
+        toRunEndBuffer.setShort(
+            (long) (i + toIndex) * typeWidth,
+            fromRunEndBuffer.getShort((long) (i) * typeWidth) + offset);
+      }
+
+    } else if (toRunEndVector instanceof IntVector) {
+      byte typeWidth = IntVector.TYPE_WIDTH;
+      for (int i = 0; i < physicalLength; i++) {
+        toRunEndBuffer.setInt(
+            (long) (i + toIndex) * typeWidth,
+            fromRunEndBuffer.getInt((long) (i) * typeWidth) + offset);
+      }
+
+    } else if (toRunEndVector instanceof BigIntVector) {
+      byte typeWidth = BigIntVector.TYPE_WIDTH;
+      for (int i = 0; i < physicalLength; i++) {
+        toRunEndBuffer.setLong(
+            (long) (i + toIndex) * typeWidth,
+            fromRunEndBuffer.getLong((long) (i) * typeWidth) + offset);
+      }
+    } else {
+      throw new IllegalArgumentException(
+          "Run-end vector and must be of type int with size 16, 32, or 64 bits.");
+    }
+  }
 }
diff --git a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
index 5c72154..5cfe64b 100644
--- a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
+++ b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorBufferVisitor.java
@@ -52,14 +52,22 @@
 
     if (vector instanceof FieldVector) {
       FieldVector fieldVector = (FieldVector) vector;
-      // TODO: https://github.com/apache/arrow/issues/41734
       int typeBufferCount = TypeLayout.getTypeBufferCount(arrowType);
-      validateOrThrow(
-          fieldVector.getFieldBuffers().size() == typeBufferCount,
-          "Expected %s buffers in vector of type %s, got %s.",
-          typeBufferCount,
-          vector.getField().getType().toString(),
-          fieldVector.getFieldBuffers().size());
+      if (TypeLayout.getTypeLayout(arrowType).isFixedBufferCount()) {
+        validateOrThrow(
+            fieldVector.getFieldBuffers().size() == typeBufferCount,
+            "Expected %s buffers in vector of type %s, got %s.",
+            typeBufferCount,
+            vector.getField().getType().toString(),
+            fieldVector.getFieldBuffers().size());
+      } else {
+        validateOrThrow(
+            fieldVector.getFieldBuffers().size() >= typeBufferCount,
+            "Expected at least %s buffers in vector of type %s, got %s.",
+            typeBufferCount,
+            vector.getField().getType().toString(),
+            fieldVector.getFieldBuffers().size());
+      }
     }
   }
 
@@ -158,7 +166,12 @@
 
   @Override
   public Void visit(BaseVariableWidthViewVector vector, Void value) {
-    throw new UnsupportedOperationException("View vectors are not supported.");
+    final int valueCount = vector.getValueCount();
+    validateVectorCommon(vector);
+    validateOrThrow(vector.getFieldBuffers().size() >= 2, "Expected at least 2 buffers.");
+    validateValidityBuffer(vector, valueCount);
+    validateDataBuffer(vector, (long) valueCount * BaseVariableWidthViewVector.ELEMENT_SIZE);
+    return null;
   }
 
   @Override
diff --git a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java
index c62bff7..9da8cc8 100644
--- a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java
+++ b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorDataVisitor.java
@@ -121,7 +121,8 @@
 
   @Override
   public Void visit(BaseVariableWidthViewVector vector, Void value) {
-    throw new UnsupportedOperationException("View vectors are not supported.");
+    vector.validateScalars();
+    return null;
   }
 
   @Override
diff --git a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java
index daad41d..395852e 100644
--- a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java
+++ b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorTypeVisitor.java
@@ -61,6 +61,8 @@
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.ViewVarBinaryVector;
+import org.apache.arrow.vector.ViewVarCharVector;
 import org.apache.arrow.vector.compare.VectorVisitor;
 import org.apache.arrow.vector.complex.DenseUnionVector;
 import org.apache.arrow.vector.complex.FixedSizeListVector;
@@ -380,7 +382,12 @@
 
   @Override
   public Void visit(BaseVariableWidthViewVector vector, Void value) {
-    throw new UnsupportedOperationException("View vectors are not supported.");
+    if (vector instanceof ViewVarCharVector) {
+      validateVectorCommon(vector, ArrowType.Utf8View.class);
+    } else if (vector instanceof ViewVarBinaryVector) {
+      validateVectorCommon(vector, ArrowType.BinaryView.class);
+    }
+    return null;
   }
 
   @Override
diff --git a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java
index 5004ba4..2111410 100644
--- a/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java
+++ b/vector/src/main/java/org/apache/arrow/vector/validate/ValidateVectorVisitor.java
@@ -107,8 +107,13 @@
   }
 
   @Override
-  public Void visit(BaseVariableWidthViewVector left, Void value) {
-    throw new UnsupportedOperationException("View vectors are not supported.");
+  public Void visit(BaseVariableWidthViewVector vector, Void value) {
+    if (vector.getValueCount() > 0) {
+      if (vector.getDataBuffer() == null || vector.getDataBuffer().capacity() == 0) {
+        throw new IllegalArgumentException("valueBuffer is null or capacity is 0");
+      }
+    }
+    return null;
   }
 
   @Override
diff --git a/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
index 5b2043a..c6c7c5c 100644
--- a/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
+++ b/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
@@ -24,16 +24,22 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.complex.BaseRepeatedValueVector;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListReader;
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.complex.impl.UuidWriterFactory;
 import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
+import org.apache.arrow.vector.holder.UuidHolder;
 import org.apache.arrow.vector.holders.DurationHolder;
 import org.apache.arrow.vector.holders.FixedSizeBinaryHolder;
 import org.apache.arrow.vector.holders.TimeStampMilliTZHolder;
@@ -42,6 +48,7 @@
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.UuidType;
 import org.apache.arrow.vector.util.TransferPair;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -1199,6 +1206,114 @@
     }
   }
 
+  @Test
+  public void testListVectorWithExtensionType() throws Exception {
+    final FieldType type = FieldType.nullable(new UuidType());
+    try (final ListVector inVector = new ListVector("list", allocator, type, null)) {
+      UnionListWriter writer = inVector.getWriter();
+      writer.allocate();
+      writer.setPosition(0);
+      UUID u1 = UUID.randomUUID();
+      UUID u2 = UUID.randomUUID();
+      writer.startList();
+      ExtensionWriter extensionWriter = writer.extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u1);
+      extensionWriter.writeExtension(u2);
+      writer.endList();
+
+      writer.setValueCount(1);
+
+      FieldReader reader = inVector.getReader();
+      assertTrue(reader.isSet(), "shouldn't be null");
+      Object result = inVector.getObject(0);
+      ArrayList<UUID> resultSet = (ArrayList<UUID>) result;
+      assertEquals(2, resultSet.size());
+      assertEquals(u1, resultSet.get(0));
+      assertEquals(u2, resultSet.get(1));
+    }
+  }
+
+  @Test
+  public void testListVectorReaderForExtensionType() throws Exception {
+    final FieldType type = FieldType.nullable(new UuidType());
+    try (final ListVector inVector = new ListVector("list", allocator, type, null)) {
+      UnionListWriter writer = inVector.getWriter();
+      writer.allocate();
+      writer.setPosition(0);
+      UUID u1 = UUID.randomUUID();
+      UUID u2 = UUID.randomUUID();
+      writer.startList();
+      ExtensionWriter extensionWriter = writer.extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u1);
+      extensionWriter.writeExtension(u2);
+      writer.endList();
+
+      writer.setValueCount(1);
+
+      UnionListReader reader = inVector.getReader();
+      assertTrue(reader.isSet(), "shouldn't be null");
+      reader.setPosition(0);
+      reader.next();
+      FieldReader uuidReader = reader.reader();
+      UuidHolder holder = new UuidHolder();
+      uuidReader.read(holder);
+      ByteBuffer bb = ByteBuffer.wrap(holder.value);
+      UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u1, actualUuid);
+      reader.next();
+      uuidReader = reader.reader();
+      uuidReader.read(holder);
+      bb = ByteBuffer.wrap(holder.value);
+      actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u2, actualUuid);
+    }
+  }
+
+  @Test
+  public void testCopyFromForExtensionType() throws Exception {
+    try (ListVector inVector = ListVector.empty("input", allocator);
+        ListVector outVector = ListVector.empty("output", allocator)) {
+      UnionListWriter writer = inVector.getWriter();
+      writer.allocate();
+      writer.setPosition(0);
+      UUID u1 = UUID.randomUUID();
+      UUID u2 = UUID.randomUUID();
+      writer.startList();
+      ExtensionWriter extensionWriter = writer.extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u1);
+      extensionWriter.writeExtension(u2);
+      extensionWriter.writeNull();
+      writer.endList();
+
+      writer.setValueCount(1);
+
+      // copy values from input to output
+      outVector.allocateNew();
+      outVector.copyFrom(0, 0, inVector, new UuidWriterFactory());
+      outVector.setValueCount(1);
+
+      UnionListReader reader = outVector.getReader();
+      assertTrue(reader.isSet(), "shouldn't be null");
+      reader.setPosition(0);
+      reader.next();
+      FieldReader uuidReader = reader.reader();
+      UuidHolder holder = new UuidHolder();
+      uuidReader.read(holder);
+      ByteBuffer bb = ByteBuffer.wrap(holder.value);
+      UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u1, actualUuid);
+      reader.next();
+      uuidReader = reader.reader();
+      uuidReader.read(holder);
+      bb = ByteBuffer.wrap(holder.value);
+      actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u2, actualUuid);
+    }
+  }
+
   private void writeIntValues(UnionListWriter writer, int[] values) {
     writer.startList();
     for (int v : values) {
diff --git a/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java b/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java
index 313d83e..1a1810d 100644
--- a/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java
+++ b/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java
@@ -22,24 +22,30 @@
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.complex.impl.UnionMapWriter;
+import org.apache.arrow.vector.complex.impl.UuidWriterFactory;
 import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.holder.UuidHolder;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.UuidType;
 import org.apache.arrow.vector.util.JsonStringArrayList;
 import org.apache.arrow.vector.util.TransferPair;
 import org.junit.jupiter.api.AfterEach;
@@ -1263,4 +1269,94 @@
       assertEquals(11, getResultValue(resultStruct));
     }
   }
+
+  @Test
+  public void testMapVectorWithExtensionType() throws Exception {
+    try (final MapVector inVector = MapVector.empty("map", allocator, false)) {
+      inVector.allocateNew();
+      UnionMapWriter writer = inVector.getWriter();
+      writer.setPosition(0);
+      UUID u1 = UUID.randomUUID();
+      UUID u2 = UUID.randomUUID();
+      writer.startMap();
+      writer.startEntry();
+      writer.key().bigInt().writeBigInt(0);
+      ExtensionWriter extensionWriter = writer.value().extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u1);
+      writer.endEntry();
+      writer.startEntry();
+      writer.key().bigInt().writeBigInt(1);
+      extensionWriter = writer.value().extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u2);
+      writer.endEntry();
+      writer.endMap();
+
+      writer.setValueCount(1);
+
+      UnionMapReader mapReader = inVector.getReader();
+      mapReader.setPosition(0);
+      mapReader.next();
+      FieldReader uuidReader = mapReader.value();
+      UuidHolder holder = new UuidHolder();
+      uuidReader.read(holder);
+      ByteBuffer bb = ByteBuffer.wrap(holder.value);
+      UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u1, actualUuid);
+      mapReader.next();
+      uuidReader = mapReader.value();
+      uuidReader.read(holder);
+      bb = ByteBuffer.wrap(holder.value);
+      actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u2, actualUuid);
+    }
+  }
+
+  @Test
+  public void testCopyFromForExtensionType() throws Exception {
+    try (final MapVector inVector = MapVector.empty("in", allocator, false);
+        final MapVector outVector = MapVector.empty("out", allocator, false)) {
+      inVector.allocateNew();
+      UnionMapWriter writer = inVector.getWriter();
+      writer.setPosition(0);
+      UUID u1 = UUID.randomUUID();
+      UUID u2 = UUID.randomUUID();
+      writer.startMap();
+      writer.startEntry();
+      writer.key().bigInt().writeBigInt(0);
+      ExtensionWriter extensionWriter = writer.value().extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u1);
+      writer.endEntry();
+      writer.startEntry();
+      writer.key().bigInt().writeBigInt(1);
+      extensionWriter = writer.value().extension(new UuidType());
+      extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+      extensionWriter.writeExtension(u2);
+      writer.endEntry();
+      writer.endMap();
+
+      writer.setValueCount(1);
+      outVector.allocateNew();
+      outVector.copyFrom(0, 0, inVector, new UuidWriterFactory());
+      outVector.setValueCount(1);
+
+      UnionMapReader mapReader = outVector.getReader();
+      mapReader.setPosition(0);
+      mapReader.next();
+      FieldReader uuidReader = mapReader.value();
+      UuidHolder holder = new UuidHolder();
+      uuidReader.read(holder);
+      ByteBuffer bb = ByteBuffer.wrap(holder.value);
+      UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u1, actualUuid);
+      mapReader.next();
+      uuidReader = mapReader.value();
+      uuidReader.read(holder);
+      bb = ByteBuffer.wrap(holder.value);
+      actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(u2, actualUuid);
+    }
+  }
 }
diff --git a/vector/src/test/java/org/apache/arrow/vector/TestVariableWidthViewVector.java b/vector/src/test/java/org/apache/arrow/vector/TestVariableWidthViewVector.java
index f7c66a0..baf5e67 100644
--- a/vector/src/test/java/org/apache/arrow/vector/TestVariableWidthViewVector.java
+++ b/vector/src/test/java/org/apache/arrow/vector/TestVariableWidthViewVector.java
@@ -61,6 +61,7 @@
 import org.apache.arrow.vector.util.ReusableByteArray;
 import org.apache.arrow.vector.util.Text;
 import org.apache.arrow.vector.util.TransferPair;
+import org.apache.arrow.vector.validate.ValidateUtil;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -2445,7 +2446,7 @@
         final ViewVarBinaryVector sourceVector =
             newViewVarBinaryVector(EMPTY_SCHEMA_PATH, allocator)) {
       testSplitAndTransferOnValiditySplitHelper(
-          targetVector, sourceVector, startIndex, length, data);
+          targetVector, sourceVector, startIndex, length, binaryData);
     }
   }
 
@@ -2852,4 +2853,18 @@
       }
     }
   }
+
+  @Test
+  public void testValidate() {
+    try (final ViewVarCharVector vector = new ViewVarCharVector("v", allocator)) {
+      vector.validateFull();
+      setVector(vector, STR1, STR2, STR3);
+      vector.validateFull();
+
+      vector.getDataBuffer().capacity(0);
+      ValidateUtil.ValidateException e =
+          assertThrows(ValidateUtil.ValidateException.class, () -> vector.validate());
+      assertTrue(e.getMessage().contains("Not enough capacity for data buffer"));
+    }
+  }
 }
diff --git a/vector/src/test/java/org/apache/arrow/vector/UuidVector.java b/vector/src/test/java/org/apache/arrow/vector/UuidVector.java
index 5c90d45..72ba4aa 100644
--- a/vector/src/test/java/org/apache/arrow/vector/UuidVector.java
+++ b/vector/src/test/java/org/apache/arrow/vector/UuidVector.java
@@ -20,6 +20,9 @@
 import java.util.UUID;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.util.hash.ArrowBufHasher;
+import org.apache.arrow.vector.complex.impl.UuidReaderImpl;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.holder.UuidHolder;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.UuidType;
@@ -79,11 +82,21 @@
     return new TransferImpl((UuidVector) to);
   }
 
+  @Override
+  protected FieldReader getReaderImpl() {
+    return new UuidReaderImpl(this);
+  }
+
   public void setSafe(int index, byte[] value) {
     getUnderlyingVector().setIndexDefined(index);
     getUnderlyingVector().setSafe(index, value);
   }
 
+  public void get(int index, UuidHolder holder) {
+    holder.value = getUnderlyingVector().get(index);
+    holder.isSet = 1;
+  }
+
   public class TransferImpl implements TransferPair {
     UuidVector to;
     ValueVector targetUnderlyingVector;
diff --git a/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestComplexCopier.java b/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestComplexCopier.java
index 3bc02c6..738e890 100644
--- a/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestComplexCopier.java
+++ b/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestComplexCopier.java
@@ -20,6 +20,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.math.BigDecimal;
+import java.util.UUID;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.DecimalVector;
@@ -30,12 +31,14 @@
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.complex.writer.BaseWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
 import org.apache.arrow.vector.complex.writer.FieldWriter;
 import org.apache.arrow.vector.holders.DecimalHolder;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.UuidType;
 import org.apache.arrow.vector.util.DecimalUtility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -845,4 +848,115 @@
       assertTrue(VectorEqualsVisitor.vectorEquals(from, to));
     }
   }
+
+  @Test
+  public void testCopyListVectorWithExtensionType() {
+    try (ListVector from = ListVector.empty("v", allocator);
+        ListVector to = ListVector.empty("v", allocator)) {
+
+      UnionListWriter listWriter = from.getWriter();
+      listWriter.allocate();
+
+      for (int i = 0; i < COUNT; i++) {
+        listWriter.setPosition(i);
+        listWriter.startList();
+        ExtensionWriter extensionWriter = listWriter.extension(new UuidType());
+        extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+        extensionWriter.writeExtension(UUID.randomUUID());
+        extensionWriter.writeExtension(UUID.randomUUID());
+        listWriter.endList();
+      }
+      from.setValueCount(COUNT);
+
+      // copy values
+      FieldReader in = from.getReader();
+      FieldWriter out = to.getWriter();
+      for (int i = 0; i < COUNT; i++) {
+        in.setPosition(i);
+        out.setPosition(i);
+        ComplexCopier.copy(in, out, new UuidWriterFactory());
+      }
+
+      to.setValueCount(COUNT);
+
+      // validate equals
+      assertTrue(VectorEqualsVisitor.vectorEquals(from, to));
+    }
+  }
+
+  @Test
+  public void testCopyMapVectorWithExtensionType() {
+    try (final MapVector from = MapVector.empty("v", allocator, false);
+        final MapVector to = MapVector.empty("v", allocator, false)) {
+
+      from.allocateNew();
+
+      UnionMapWriter mapWriter = from.getWriter();
+      for (int i = 0; i < COUNT; i++) {
+        mapWriter.setPosition(i);
+        mapWriter.startMap();
+        mapWriter.startEntry();
+        ExtensionWriter extensionKeyWriter = mapWriter.key().extension(new UuidType());
+        extensionKeyWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+        extensionKeyWriter.writeExtension(UUID.randomUUID());
+        ExtensionWriter extensionValueWriter = mapWriter.value().extension(new UuidType());
+        extensionValueWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+        extensionValueWriter.writeExtension(UUID.randomUUID());
+        mapWriter.endEntry();
+        mapWriter.endMap();
+      }
+
+      from.setValueCount(COUNT);
+
+      // copy values
+      FieldReader in = from.getReader();
+      FieldWriter out = to.getWriter();
+      for (int i = 0; i < COUNT; i++) {
+        in.setPosition(i);
+        out.setPosition(i);
+        ComplexCopier.copy(in, out, new UuidWriterFactory());
+      }
+      to.setValueCount(COUNT);
+
+      // validate equals
+      assertTrue(VectorEqualsVisitor.vectorEquals(from, to));
+    }
+  }
+
+  @Test
+  public void testCopyStructVectorWithExtensionType() {
+    try (final StructVector from = StructVector.empty("v", allocator);
+        final StructVector to = StructVector.empty("v", allocator)) {
+
+      from.allocateNewSafe();
+
+      NullableStructWriter structWriter = from.getWriter();
+      for (int i = 0; i < COUNT; i++) {
+        structWriter.setPosition(i);
+        structWriter.start();
+        ExtensionWriter extensionWriter1 = structWriter.extension("timestamp1", new UuidType());
+        extensionWriter1.addExtensionTypeWriterFactory(new UuidWriterFactory());
+        extensionWriter1.writeExtension(UUID.randomUUID());
+        ExtensionWriter extensionWriter2 = structWriter.extension("timestamp2", new UuidType());
+        extensionWriter2.addExtensionTypeWriterFactory(new UuidWriterFactory());
+        extensionWriter2.writeExtension(UUID.randomUUID());
+        structWriter.end();
+      }
+
+      from.setValueCount(COUNT);
+
+      // copy values
+      FieldReader in = from.getReader();
+      FieldWriter out = to.getWriter();
+      for (int i = 0; i < COUNT; i++) {
+        in.setPosition(i);
+        out.setPosition(i);
+        ComplexCopier.copy(in, out, new UuidWriterFactory());
+      }
+      to.setValueCount(COUNT);
+
+      // validate equals
+      assertTrue(VectorEqualsVisitor.vectorEquals(from, to));
+    }
+  }
 }
diff --git a/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java b/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
index 1556852..7b8b1f9 100644
--- a/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
+++ b/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java
@@ -805,4 +805,29 @@
       assertEquals(u2, uuidVector.getObject(1));
     }
   }
+
+  @Test
+  public void testExtensionTypeForList() throws Exception {
+    try (final ListVector container = ListVector.empty(EMPTY_SCHEMA_PATH, allocator);
+        final UuidVector v =
+            (UuidVector) container.addOrGetVector(FieldType.nullable(new UuidType())).getVector();
+        final PromotableWriter writer = new PromotableWriter(v, container)) {
+      UUID u1 = UUID.randomUUID();
+      UUID u2 = UUID.randomUUID();
+      container.allocateNew();
+      container.setValueCount(1);
+      writer.addExtensionTypeWriterFactory(new UuidWriterFactory());
+
+      writer.setPosition(0);
+      writer.writeExtension(u1);
+      writer.setPosition(1);
+      writer.writeExtension(u2);
+
+      container.setValueCount(2);
+
+      UuidVector uuidVector = (UuidVector) container.getDataVector();
+      assertEquals(u1, uuidVector.getObject(0));
+      assertEquals(u2, uuidVector.getObject(1));
+    }
+  }
 }
diff --git a/vector/src/test/java/org/apache/arrow/vector/complex/impl/UuidReaderImpl.java b/vector/src/test/java/org/apache/arrow/vector/complex/impl/UuidReaderImpl.java
new file mode 100644
index 0000000..6b98d3b
--- /dev/null
+++ b/vector/src/test/java/org/apache/arrow/vector/complex/impl/UuidReaderImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.impl;
+
+import org.apache.arrow.vector.UuidVector;
+import org.apache.arrow.vector.holder.UuidHolder;
+import org.apache.arrow.vector.holders.ExtensionHolder;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Field;
+
+public class UuidReaderImpl extends AbstractFieldReader {
+
+  private final UuidVector vector;
+
+  public UuidReaderImpl(UuidVector vector) {
+    super();
+    this.vector = vector;
+  }
+
+  @Override
+  public MinorType getMinorType() {
+    return vector.getMinorType();
+  }
+
+  @Override
+  public Field getField() {
+    return vector.getField();
+  }
+
+  @Override
+  public boolean isSet() {
+    return !vector.isNull(idx());
+  }
+
+  @Override
+  public void read(ExtensionHolder holder) {
+    vector.get(idx(), (UuidHolder) holder);
+  }
+
+  @Override
+  public void read(int arrayIndex, ExtensionHolder holder) {
+    vector.get(arrayIndex, (UuidHolder) holder);
+  }
+
+  @Override
+  public void copyAsValue(AbstractExtensionTypeWriter writer) {
+    UuidWriterImpl impl = (UuidWriterImpl) writer;
+    impl.vector.copyFromSafe(idx(), impl.idx(), vector);
+  }
+
+  @Override
+  public Object readObject() {
+    return vector.getObject(idx());
+  }
+}
diff --git a/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
index 2745386..f374eb4 100644
--- a/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
+++ b/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java
@@ -19,6 +19,7 @@
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -31,6 +32,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
@@ -64,6 +66,7 @@
 import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.complex.impl.UnionReader;
 import org.apache.arrow.vector.complex.impl.UnionWriter;
+import org.apache.arrow.vector.complex.impl.UuidWriterFactory;
 import org.apache.arrow.vector.complex.reader.BaseReader.StructReader;
 import org.apache.arrow.vector.complex.reader.BigIntReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
@@ -71,9 +74,11 @@
 import org.apache.arrow.vector.complex.reader.Float8Reader;
 import org.apache.arrow.vector.complex.reader.IntReader;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
+import org.apache.arrow.vector.holder.UuidHolder;
 import org.apache.arrow.vector.holders.DecimalHolder;
 import org.apache.arrow.vector.holders.DurationHolder;
 import org.apache.arrow.vector.holders.FixedSizeBinaryHolder;
@@ -84,6 +89,7 @@
 import org.apache.arrow.vector.holders.NullableTimeStampNanoTZHolder;
 import org.apache.arrow.vector.holders.TimeStampMilliTZHolder;
 import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
@@ -93,6 +99,7 @@
 import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.UuidType;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.DecimalUtility;
 import org.apache.arrow.vector.util.JsonStringArrayList;
@@ -2489,4 +2496,38 @@
           "row12", new String(vector.getLargeVarBinaryVector().get(11), StandardCharsets.UTF_8));
     }
   }
+
+  @Test
+  public void extensionWriterReader() throws Exception {
+    // test values
+    UUID u1 = UUID.randomUUID();
+
+    try (NonNullableStructVector parent = NonNullableStructVector.empty("parent", allocator)) {
+      // write
+
+      ComplexWriter writer = new ComplexWriterImpl("root", parent);
+      StructWriter rootWriter = writer.rootAsStruct();
+
+      {
+        ExtensionWriter extensionWriter = rootWriter.extension("uuid1", new UuidType());
+        extensionWriter.setPosition(0);
+        extensionWriter.addExtensionTypeWriterFactory(new UuidWriterFactory());
+        extensionWriter.writeExtension(u1);
+      }
+      // read
+      StructReader rootReader = new SingleStructReaderImpl(parent).reader("root");
+      {
+        FieldReader uuidReader = rootReader.reader("uuid1");
+        uuidReader.setPosition(0);
+        UuidHolder uuidHolder = new UuidHolder();
+        uuidReader.read(uuidHolder);
+        final ByteBuffer bb = ByteBuffer.wrap(uuidHolder.value);
+        UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
+        assertEquals(u1, actualUuid);
+        assertTrue(uuidReader.isSet());
+        assertEquals(uuidReader.getMinorType(), MinorType.EXTENSIONTYPE);
+        assertInstanceOf(UuidType.class, uuidReader.getField().getFieldType().getType());
+      }
+    }
+  }
 }
diff --git a/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestSimpleWriter.java b/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestSimpleWriter.java
index bf1b9b0..269cff0 100644
--- a/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestSimpleWriter.java
+++ b/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestSimpleWriter.java
@@ -30,6 +30,7 @@
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.impl.LargeVarBinaryWriterImpl;
 import org.apache.arrow.vector.complex.impl.LargeVarCharWriterImpl;
+import org.apache.arrow.vector.complex.impl.UuidReaderImpl;
 import org.apache.arrow.vector.complex.impl.UuidWriterImpl;
 import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
 import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
@@ -204,4 +205,23 @@
       assertEquals(uuid, result);
     }
   }
+
+  @Test
+  public void testReaderCopyAsValueExtensionVector() throws Exception {
+    try (UuidVector vector = new UuidVector("test", allocator);
+        UuidVector vectorForRead = new UuidVector("test2", allocator);
+        UuidWriterImpl writer = new UuidWriterImpl(vector)) {
+      UUID uuid = UUID.randomUUID();
+      vectorForRead.setValueCount(1);
+      vectorForRead.set(0, uuid);
+      UuidReaderImpl reader = (UuidReaderImpl) vectorForRead.getReader();
+      reader.copyAsValue(writer);
+      UuidReaderImpl reader2 = (UuidReaderImpl) vector.getReader();
+      UuidHolder holder = new UuidHolder();
+      reader2.read(0, holder);
+      final ByteBuffer bb = ByteBuffer.wrap(holder.value);
+      UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
+      assertEquals(uuid, actualUuid);
+    }
+  }
 }
diff --git a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java
index 4ee9630..df5521a 100644
--- a/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java
+++ b/vector/src/test/java/org/apache/arrow/vector/util/TestVectorAppender.java
@@ -47,6 +47,7 @@
 import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.LargeListVector;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.RunEndEncodedVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.complex.UnionVector;
 import org.apache.arrow.vector.holders.NullableBigIntHolder;
@@ -1026,6 +1027,72 @@
   }
 
   @Test
+  public void testAppendRunEndEncodedVector() {
+    final FieldType reeFieldType = FieldType.notNullable(ArrowType.RunEndEncoded.INSTANCE);
+    final Field runEndsField =
+        new Field("runEnds", FieldType.notNullable(Types.MinorType.INT.getType()), null);
+    final Field valuesField = Field.nullable("values", Types.MinorType.INT.getType());
+    final List<Field> children = Arrays.asList(runEndsField, valuesField);
+
+    final Field targetField = new Field("target", reeFieldType, children);
+    final Field deltaField = new Field("delta", reeFieldType, children);
+    try (RunEndEncodedVector target = new RunEndEncodedVector(targetField, allocator, null);
+        RunEndEncodedVector delta = new RunEndEncodedVector(deltaField, allocator, null)) {
+
+      // populate target
+      target.allocateNew();
+      // data: [1, 1, 2, null, 3, 3, 3] (7 values)
+      // values: [1, 2, null, 3]
+      // runEnds: [2, 3, 4, 7]
+      ValueVectorDataPopulator.setVector((IntVector) target.getValuesVector(), 1, 2, null, 3);
+      ValueVectorDataPopulator.setVector((IntVector) target.getRunEndsVector(), 2, 3, 4, 7);
+      target.setValueCount(7);
+
+      // populate delta
+      delta.allocateNew();
+      // data: [3, 4, 4, 5, null, null] (6 values)
+      // values: [3, 4, 5, null]
+      // runEnds: [1, 3, 4, 6]
+      ValueVectorDataPopulator.setVector((IntVector) delta.getValuesVector(), 3, 4, 5, null);
+      ValueVectorDataPopulator.setVector((IntVector) delta.getRunEndsVector(), 1, 3, 4, 6);
+      delta.setValueCount(6);
+
+      VectorAppender appender = new VectorAppender(target);
+      delta.accept(appender, null);
+
+      assertEquals(13, target.getValueCount());
+
+      final Field expectedField = new Field("expected", reeFieldType, children);
+      try (RunEndEncodedVector expected = new RunEndEncodedVector(expectedField, allocator, null)) {
+        expected.allocateNew();
+        // expected data: [1, 1, 2, null, 3, 3, 3, 3, 4, 4, 5, null, null] (13 values)
+        // expected values: [1, 2, null, 3, 3, 4, 5, null]
+        // expected runEnds: [2, 3, 4, 7, 8, 10, 11, 13]
+        ValueVectorDataPopulator.setVector(
+            (IntVector) expected.getValuesVector(), 1, 2, null, 3, 3, 4, 5, null);
+        ValueVectorDataPopulator.setVector(
+            (IntVector) expected.getRunEndsVector(), 2, 3, 4, 7, 8, 10, 11, 13);
+        expected.setValueCount(13);
+
+        assertVectorsEqual(expected, target);
+      }
+
+      // Check that delta is unchanged.
+      final Field expectedDeltaField = new Field("expectedDelta", reeFieldType, children);
+      try (RunEndEncodedVector expectedDelta =
+          new RunEndEncodedVector(expectedDeltaField, allocator, null)) {
+        expectedDelta.allocateNew();
+        ValueVectorDataPopulator.setVector(
+            (IntVector) expectedDelta.getValuesVector(), 3, 4, 5, null);
+        ValueVectorDataPopulator.setVector(
+            (IntVector) expectedDelta.getRunEndsVector(), 1, 3, 4, 6);
+        expectedDelta.setValueCount(6);
+        assertVectorsEqual(expectedDelta, delta);
+      }
+    }
+  }
+
+  @Test
   public void testAppendVectorNegative() {
     final int vectorLength = 10;
     try (IntVector target = new IntVector("", allocator);