[CALCITE-2040] Create adapter for Apache Arrow

Co-authored-by: Alessandro Solimando <alessandro.solimando@gmail.com>
Co-authored-by: Jonathan Swenson <jonathan@exploreomni.com>
Co-authored-by: Julian Hyde <jhyde@apache.org>
Co-authored-by: Karshit Shah <shahkarshit@yahoo.co.in>
Co-authored-by: Michael Mior <mmior@cs.rit.edu>
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 4c8a548..059a2b3 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -69,7 +69,9 @@
       with:
         job-id: jdk${{ matrix.jdk }}
         remote-build-cache-proxy-enabled: false
-        arguments: --scan --no-parallel --no-daemon build javadoc
+        # Arrow build is excluded because it is not supported on Windows
+        # See https://arrow.apache.org/docs/java/install.html#system-compatibility
+        arguments: --scan --no-parallel --no-daemon build javadoc --exclude-task :arrow:build
     - name: 'sqlline and sqllsh'
       shell: cmd
       run: |
@@ -103,7 +105,9 @@
         with:
           job-id: jdk${{ matrix.jdk }}
           remote-build-cache-proxy-enabled: false
-          arguments: --scan --no-parallel --no-daemon build
+          # Arrow build is excluded because it is not supported on Windows
+          # See https://arrow.apache.org/docs/java/install.html#system-compatibility
+          arguments: --scan --no-parallel --no-daemon build --exclude-task :arrow:build
       - name: 'sqlline and sqllsh'
         shell: cmd
         run: |
@@ -215,6 +219,9 @@
           S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
           S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
           GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+          # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+          # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+          _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED
         with:
           job-id: jdk${{ matrix.jdk }}
           remote-build-cache-proxy-enabled: false
@@ -241,6 +248,9 @@
           S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
           S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
           GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+          # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+          # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+          _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED
         with:
           job-id: jdk${{ matrix.jdk }}
           remote-build-cache-proxy-enabled: false
@@ -310,6 +320,9 @@
           S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
           S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
           GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+          # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+          # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+          _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED
         with:
           job-id: jdk19
           remote-build-cache-proxy-enabled: false
diff --git a/Jenkinsfile b/Jenkinsfile
index 53cc1e6..bd4216c 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -41,7 +41,9 @@
   }
   stage('Code Quality') {
     timeout(time: 1, unit: 'HOURS') {
-      withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17"]) {
+      // The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+      // to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+      withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17","_JAVA_OPTIONS=--add-opens=java.base/java.nio=ALL-UNNAMED"]) {
         withCredentials([string(credentialsId: 'SONARCLOUD_TOKEN', variable: 'SONAR_TOKEN')]) {
           if ( env.BRANCH_NAME.startsWith("PR-") ) {
             sh './gradlew --no-parallel --no-daemon jacocoAggregateTestReport sonar -PenableJacoco -Dsonar.pullrequest.branch=${CHANGE_BRANCH} -Dsonar.pullrequest.base=${CHANGE_TARGET} -Dsonar.pullrequest.key=${CHANGE_ID} -Dsonar.login=${SONAR_TOKEN}'
diff --git a/arrow/build.gradle.kts b/arrow/build.gradle.kts
new file mode 100644
index 0000000..0ed73e8
--- /dev/null
+++ b/arrow/build.gradle.kts
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+dependencies {
+    api(project(":core"))
+
+    implementation("com.google.guava:guava")
+    implementation("org.apache.arrow:arrow-memory-netty")
+    implementation("org.apache.arrow:arrow-vector")
+    implementation("org.apache.arrow.gandiva:arrow-gandiva")
+    annotationProcessor("org.immutables:value")
+    compileOnly("org.immutables:value-annotations")
+
+    testImplementation("org.apache.arrow:arrow-jdbc")
+    testImplementation("net.hydromatic:scott-data-hsqldb")
+    testImplementation("org.apache.commons:commons-lang3")
+    testImplementation(project(":core"))
+    testImplementation(project(":testkit"))
+}
diff --git a/arrow/gradle.properties b/arrow/gradle.properties
new file mode 100644
index 0000000..4d2cfdd
--- /dev/null
+++ b/arrow/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+description=Arrow adapter for Calcite
+artifact.name=Calcite Arrow
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
new file mode 100644
index 0000000..6351dca
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Enumerator that reads from a collection of Arrow value-vectors.
+ */
+abstract class AbstractArrowEnumerator implements Enumerator<Object> {
+  protected final ArrowFileReader arrowFileReader;
+  protected final List<Integer> fields;
+  protected final List<ValueVector> valueVectors;
+  protected int currRowIndex;
+  protected int rowCount;
+
+  AbstractArrowEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields) {
+    this.arrowFileReader = arrowFileReader;
+    this.fields = fields;
+    this.valueVectors = new ArrayList<>(fields.size());
+    this.currRowIndex = -1;
+  }
+
+  abstract void evaluateOperator(ArrowRecordBatch arrowRecordBatch);
+
+  protected void loadNextArrowBatch() {
+    try {
+      final VectorSchemaRoot vsr = arrowFileReader.getVectorSchemaRoot();
+      for (int i : fields) {
+        this.valueVectors.add(vsr.getVector(i));
+      }
+      this.rowCount = vsr.getRowCount();
+      VectorUnloader vectorUnloader = new VectorUnloader(vsr);
+      ArrowRecordBatch arrowRecordBatch = vectorUnloader.getRecordBatch();
+      evaluateOperator(arrowRecordBatch);
+    } catch (IOException e) {
+      throw Util.toUnchecked(e);
+    }
+  }
+
+  @Override public Object current() {
+    if (fields.size() == 1) {
+      return this.valueVectors.get(0).getObject(currRowIndex);
+    }
+    Object[] current = new Object[valueVectors.size()];
+    for (int i = 0; i < valueVectors.size(); i++) {
+      ValueVector vector = this.valueVectors.get(i);
+      current[i] = vector.getObject(currRowIndex);
+    }
+    return current;
+  }
+
+  @Override public void reset() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
new file mode 100644
index 0000000..142f18c
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Filter;
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Enumerable that reads from Arrow value-vectors.
+ */
+class ArrowEnumerable extends AbstractEnumerable<Object> {
+  private final ArrowFileReader arrowFileReader;
+  private final ImmutableIntList fields;
+  private final @Nullable Projector projector;
+  private final @Nullable Filter filter;
+
+
+  ArrowEnumerable(ArrowFileReader arrowFileReader, ImmutableIntList fields,
+      @Nullable Projector projector, @Nullable Filter filter) {
+    this.arrowFileReader = arrowFileReader;
+    this.projector = projector;
+    this.filter = filter;
+    this.fields = fields;
+  }
+
+  @Override public Enumerator<Object> enumerator() {
+    try {
+      if (projector != null) {
+        return new ArrowProjectEnumerator(arrowFileReader, fields, projector);
+      } else if (filter != null) {
+        return new ArrowFilterEnumerator(arrowFileReader, fields, filter);
+      }
+      throw new IllegalArgumentException(
+          "The arrow enumerator must have either a filter or a projection");
+    } catch (Exception e) {
+      throw Util.toUnchecked(e);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java
new file mode 100644
index 0000000..4cb6c38
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.rel.type.RelDataType;
+
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Arrow field type.
+ */
+enum ArrowFieldType {
+  INT(Primitive.INT),
+  BOOLEAN(Primitive.BOOLEAN),
+  STRING(String.class),
+  FLOAT(Primitive.FLOAT),
+  DOUBLE(Primitive.DOUBLE),
+  DATE(Date.class),
+  LIST(List.class),
+  DECIMAL(BigDecimal.class),
+  LONG(Primitive.LONG),
+  BYTE(Primitive.BYTE),
+  SHORT(Primitive.SHORT);
+
+  private final Class<?> clazz;
+
+  ArrowFieldType(Primitive primitive) {
+    this(requireNonNull(primitive.boxClass, "boxClass"));
+  }
+
+  ArrowFieldType(Class<?> clazz) {
+    this.clazz = clazz;
+  }
+
+  public RelDataType toType(JavaTypeFactory typeFactory) {
+    RelDataType javaType = typeFactory.createJavaType(clazz);
+    RelDataType sqlType = typeFactory.createSqlType(javaType.getSqlTypeName());
+    return typeFactory.createTypeWithNullability(sqlType, true);
+  }
+
+  public static ArrowFieldType of(ArrowType arrowType) {
+    switch (arrowType.getTypeID()) {
+    case Int:
+      int bitWidth = ((ArrowType.Int) arrowType).getBitWidth();
+      switch (bitWidth) {
+      case 64:
+        return LONG;
+      case 32:
+        return INT;
+      case 16:
+        return SHORT;
+      case 8:
+        return BYTE;
+      default:
+        throw new IllegalArgumentException("Unsupported Int bit width: " + bitWidth);
+      }
+    case Bool:
+      return BOOLEAN;
+    case Utf8:
+      return STRING;
+    case FloatingPoint:
+      FloatingPointPrecision precision = ((ArrowType.FloatingPoint) arrowType).getPrecision();
+      switch (precision) {
+      case SINGLE:
+        return FLOAT;
+      case DOUBLE:
+        return DOUBLE;
+      default:
+        throw new IllegalArgumentException("Unsupported Floating point precision: " + precision);
+      }
+    case Date:
+      return DATE;
+    case Decimal:
+      return DECIMAL;
+    default:
+      throw new IllegalArgumentException("Unsupported type: " + arrowType);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java
new file mode 100644
index 0000000..9774318
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Arrow.
+ */
+class ArrowFilter extends Filter implements ArrowRel {
+  private final List<String> match;
+
+  ArrowFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode condition) {
+    super(cluster, traitSet, input, condition);
+    final ArrowTranslator translator =
+        ArrowTranslator.create(cluster.getRexBuilder(), input.getRowType());
+    this.match = translator.translateMatch(condition);
+
+    assert getConvention() == ArrowRel.CONVENTION;
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    final RelOptCost cost = super.computeSelfCost(planner, mq);
+    return requireNonNull(cost, "cost").multiplyBy(0.1);
+  }
+
+  @Override public ArrowFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+    return new ArrowFilter(getCluster(), traitSet, input, condition);
+  }
+
+  @Override public void implement(Implementor implementor) {
+    implementor.visitInput(0, getInput());
+    implementor.addFilters(match);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
new file mode 100644
index 0000000..c52a542
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Filter;
+import org.apache.arrow.gandiva.evaluator.SelectionVector;
+import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+
+/**
+ * Enumerator that reads from a filtered collection of Arrow value-vectors.
+ */
+class ArrowFilterEnumerator extends AbstractArrowEnumerator {
+  private final BufferAllocator allocator;
+  private final Filter filter;
+  private ArrowBuf buf;
+  private SelectionVector selectionVector;
+  private int selectionVectorIndex;
+
+  ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields, Filter filter) {
+    super(arrowFileReader, fields);
+    this.allocator = new RootAllocator(Long.MAX_VALUE);
+    this.filter = filter;
+  }
+
+  @Override void evaluateOperator(ArrowRecordBatch arrowRecordBatch) {
+    try {
+      this.buf = this.allocator.buffer((long) rowCount * 2);
+      this.selectionVector = new SelectionVectorInt16(buf);
+      filter.evaluate(arrowRecordBatch, selectionVector);
+    } catch (GandivaException e) {
+      throw Util.toUnchecked(e);
+    }
+  }
+
+  @Override public boolean moveNext() {
+    if (selectionVector == null
+        || selectionVectorIndex >= selectionVector.getRecordCount()) {
+      boolean hasNextBatch;
+      while (true) {
+        try {
+          hasNextBatch = arrowFileReader.loadNextBatch();
+        } catch (IOException e) {
+          throw Util.toUnchecked(e);
+        }
+        if (hasNextBatch) {
+          selectionVectorIndex = 0;
+          this.valueVectors.clear();
+          loadNextArrowBatch();
+          assert selectionVector != null;
+          if (selectionVectorIndex >= selectionVector.getRecordCount()) {
+            // the "filtered" batch is empty, but there may be more batches to fetch
+            continue;
+          }
+          currRowIndex = selectionVector.getIndex(selectionVectorIndex++);
+        }
+        return hasNextBatch;
+      }
+    } else {
+      currRowIndex = selectionVector.getIndex(selectionVectorIndex++);
+      return true;
+    }
+  }
+
+  @Override public void close() {
+    try {
+      if (buf != null) {
+        buf.close();
+      }
+      filter.close();
+    } catch (GandivaException e) {
+      throw Util.toUnchecked(e);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java
new file mode 100644
index 0000000..66b078d
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.util.ImmutableIntList;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Built-in methods in the Arrow adapter.
+ *
+ * @see org.apache.calcite.util.BuiltInMethod
+ */
+@SuppressWarnings("ImmutableEnumChecker")
+enum ArrowMethod {
+  ARROW_QUERY(ArrowTable.class, "query", DataContext.class,
+      ImmutableIntList.class, List.class);
+
+  final Method method;
+
+  static final ImmutableMap<Method, ArrowMethod> MAP;
+
+  static {
+    final ImmutableMap.Builder<Method, ArrowMethod> builder =
+        ImmutableMap.builder();
+    for (ArrowMethod value : ArrowMethod.values()) {
+      builder.put(value.method, value);
+    }
+    MAP = builder.build();
+  }
+
+  /** Defines a method. */
+  ArrowMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
+    this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java
new file mode 100644
index 0000000..8fee390
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Arrow.
+ */
+class ArrowProject extends Project implements ArrowRel {
+
+  /** Creates an ArrowProject. */
+  ArrowProject(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
+    assert getConvention() == ArrowRel.CONVENTION;
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override public Project copy(RelTraitSet traitSet, RelNode input,
+      List<RexNode> projects, RelDataType rowType) {
+    return new ArrowProject(getCluster(), traitSet, input, projects,
+        rowType);
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    final RelOptCost cost = super.computeSelfCost(planner, mq);
+    return requireNonNull(cost, "cost").multiplyBy(0.1);
+  }
+
+  @Override public void implement(Implementor implementor) {
+    implementor.visitInput(0, getInput());
+    List<Integer> projectedFields = getProjectFields(getProjects());
+    if (projectedFields != null) {
+      implementor.addProjectFields(projectedFields);
+    }
+  }
+
+  static @Nullable List<Integer> getProjectFields(List<RexNode> exps) {
+    final List<Integer> fields = new ArrayList<>();
+    for (final RexNode exp : exps) {
+      if (exp instanceof RexInputRef) {
+        fields.add(((RexInputRef) exp).getIndex());
+      } else {
+        return null; // not a simple projection
+      }
+    }
+    return fields;
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
new file mode 100644
index 0000000..2426810
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+
+/**
+ * Enumerator that reads from a projected collection of Arrow value-vectors.
+ */
+class ArrowProjectEnumerator extends AbstractArrowEnumerator {
+  private final Projector projector;
+
+  ArrowProjectEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields,
+      Projector projector) {
+    super(arrowFileReader, fields);
+    this.projector = projector;
+  }
+
+  @Override protected void evaluateOperator(ArrowRecordBatch arrowRecordBatch) {
+    try {
+      projector.evaluate(arrowRecordBatch, valueVectors);
+    } catch (GandivaException e) {
+      throw Util.toUnchecked(e);
+    }
+  }
+
+  @Override public boolean moveNext() {
+    if (currRowIndex >= rowCount - 1) {
+      final boolean hasNextBatch;
+      try {
+        hasNextBatch = arrowFileReader.loadNextBatch();
+      } catch (IOException e) {
+        throw Util.toUnchecked(e);
+      }
+      if (hasNextBatch) {
+        currRowIndex = 0;
+        this.valueVectors.clear();
+        loadNextArrowBatch();
+      }
+      return hasNextBatch;
+    } else {
+      currRowIndex++;
+      return true;
+    }
+  }
+
+  @Override public void close() {
+    try {
+      projector.close();
+    } catch (GandivaException e) {
+      throw Util.toUnchecked(e);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java
new file mode 100644
index 0000000..5b002bd
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.ImmutableIntList;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Relational expression that uses the Arrow calling convention.
+ */
+public interface ArrowRel extends RelNode {
+  void implement(Implementor implementor);
+
+  /** Calling convention for relational operations that occur in Arrow. */
+  Convention CONVENTION = new Convention.Impl("ARROW", ArrowRel.class);
+
+  /** Callback for the implementation process that converts a tree of
+   * {@link ArrowRel} nodes into a SQL query. */
+  class Implementor {
+    @Nullable List<Integer> selectFields;
+    final List<String> whereClause = new ArrayList<>();
+    @Nullable RelOptTable table;
+    @Nullable ArrowTable arrowTable;
+
+    /** Adds new predicates.
+     *
+     * @param predicates Predicates
+     */
+    void addFilters(List<String> predicates) {
+      whereClause.addAll(predicates);
+    }
+
+    /** Adds newly projected fields.
+     *
+     * @param fields New fields to be projected from a query
+     */
+    void addProjectFields(List<Integer> fields) {
+      selectFields = ImmutableIntList.copyOf(fields);
+    }
+
+    public void visitInput(int ordinal, RelNode input) {
+      checkArgument(ordinal == 0);
+      ((ArrowRel) input).implement(this);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java
new file mode 100644
index 0000000..42e47e8
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+import com.google.common.collect.ImmutableList;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+/** Planner rules relating to the Arrow adapter. */
+public class ArrowRules {
+  private ArrowRules() {}
+
+  /** Rule that matches a {@link org.apache.calcite.rel.core.Project} on
+   * an {@link ArrowTableScan} and pushes down projects if possible. */
+  public static final ArrowProjectRule PROJECT_SCAN =
+      ArrowProjectRule.DEFAULT_CONFIG.toRule(ArrowProjectRule.class);
+
+  public static final ArrowFilterRule FILTER_SCAN =
+      ArrowFilterRule.Config.DEFAULT.toRule();
+
+  public static final ConverterRule TO_ENUMERABLE =
+      ArrowToEnumerableConverterRule.DEFAULT_CONFIG
+          .toRule(ArrowToEnumerableConverterRule.class);
+
+  public static final List<RelOptRule> RULES = ImmutableList.of(PROJECT_SCAN, FILTER_SCAN);
+
+  static List<String> arrowFieldNames(final RelDataType rowType) {
+    return SqlValidatorUtil.uniquify(rowType.getFieldNames(),
+        SqlValidatorUtil.EXPR_SUGGESTER, true);
+  }
+
+  /** Base class for planner rules that convert a relational expression to
+   * the Arrow calling convention. */
+  abstract static class ArrowConverterRule extends ConverterRule {
+    ArrowConverterRule(Config config) {
+      super(config);
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.core.Filter} to an
+   * {@link ArrowFilter}.
+   */
+  public static class ArrowFilterRule extends RelRule<ArrowFilterRule.Config> {
+
+    /** Creates an ArrowFilterRule. */
+    protected ArrowFilterRule(Config config) {
+      super(config);
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Filter filter = call.rel(0);
+
+      if (filter.getTraitSet().contains(Convention.NONE)) {
+        final RelNode converted = convert(filter);
+        call.transformTo(converted);
+      }
+    }
+
+    RelNode convert(Filter filter) {
+      final RelTraitSet traitSet =
+          filter.getTraitSet().replace(ArrowRel.CONVENTION);
+      return new ArrowFilter(filter.getCluster(), traitSet,
+          convert(filter.getInput(), ArrowRel.CONVENTION),
+          filter.getCondition());
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+      Config DEFAULT = ImmutableConfig.builder()
+          .withOperandSupplier(b0 ->
+              b0.operand(LogicalFilter.class).oneInput(b1 ->
+                  b1.operand(ArrowTableScan.class).noInputs()))
+          .build();
+
+      @Override default ArrowFilterRule toRule() {
+        return new ArrowFilterRule(this);
+      }
+    }
+  }
+
+  /**
+   * Planner rule that projects from an {@link ArrowTableScan} just the columns
+   * needed to satisfy a projection. If the projection's expressions are
+   * trivial, the projection is removed.
+   *
+   * @see ArrowRules#PROJECT_SCAN
+   */
+  public static class ArrowProjectRule extends ArrowConverterRule {
+
+    /** Default configuration. */
+    protected static final Config DEFAULT_CONFIG = Config.INSTANCE
+        .withConversion(LogicalProject.class, Convention.NONE,
+            ArrowRel.CONVENTION, "ArrowProjectRule")
+        .withRuleFactory(ArrowProjectRule::new);
+
+    /** Creates an ArrowProjectRule. */
+    protected ArrowProjectRule(Config config) {
+      super(config);
+    }
+
+    @Override public @Nullable RelNode convert(RelNode rel) {
+      final Project project = (Project) rel;
+      @Nullable List<Integer> fields =
+          ArrowProject.getProjectFields(project.getProjects());
+      if (fields == null) {
+        // Project contains expressions more complex than just field references.
+        return null;
+      }
+      final RelTraitSet traitSet =
+          project.getTraitSet().replace(ArrowRel.CONVENTION);
+      return new ArrowProject(project.getCluster(), traitSet,
+          convert(project.getInput(), ArrowRel.CONVENTION),
+          project.getProjects(), project.getRowType());
+    }
+  }
+
+  /**
+   * Rule to convert a relational expression from
+   * {@link ArrowRel#CONVENTION} to {@link EnumerableConvention}.
+   */
+  static class ArrowToEnumerableConverterRule extends ConverterRule {
+
+    /** Default configuration. */
+    public static final Config DEFAULT_CONFIG = Config.INSTANCE
+        .withConversion(RelNode.class, ArrowRel.CONVENTION,
+            EnumerableConvention.INSTANCE, "ArrowToEnumerableConverterRule")
+        .withRuleFactory(ArrowToEnumerableConverterRule::new);
+
+    /** Creates an ArrowToEnumerableConverterRule. */
+    protected ArrowToEnumerableConverterRule(Config config) {
+      super(config);
+    }
+
+    @Override public RelNode convert(RelNode rel) {
+      RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+      return new ArrowToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
new file mode 100644
index 0000000..c40cf44
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.util.Sources;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.SeekableReadChannel;
+
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Schema mapped onto a set of Arrow files.
+ */
+class ArrowSchema extends AbstractSchema {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ArrowSchema.class);
+  private final Supplier<Map<String, Table>> tableMapSupplier;
+
+  /**
+   * Creates an Arrow schema.
+   *
+   * @param baseDirectory Base directory to look for relative files
+   */
+  ArrowSchema(File baseDirectory) {
+    requireNonNull(baseDirectory, "baseDirectory");
+    this.tableMapSupplier =
+        Suppliers.memoize(() -> deduceTableMap(baseDirectory));
+  }
+
+  /**
+   * Looks for a suffix on a string and returns
+   * either the string with the suffix removed
+   * or the original string.
+   */
+  private static String trim(String s, String suffix) {
+    String trimmed = trimOrNull(s, suffix);
+    return trimmed != null ? trimmed : s;
+  }
+
+  /**
+   * Looks for a suffix on a string and returns
+   * either the string with the suffix removed
+   * or null.
+   */
+  private static @Nullable String trimOrNull(String s, String suffix) {
+    return s.endsWith(suffix)
+        ? s.substring(0, s.length() - suffix.length())
+        : null;
+  }
+
+  @Override protected Map<String, Table> getTableMap() {
+    return tableMapSupplier.get();
+  }
+
+  private static Map<String, Table> deduceTableMap(File baseDirectory) {
+    File[] files = baseDirectory.listFiles((dir, name) -> name.endsWith(".arrow"));
+    if (files == null) {
+      LOGGER.info("directory " + baseDirectory + " not found");
+      return ImmutableMap.of();
+    }
+
+    final Map<String, Table> tables = new HashMap<>();
+    for (File file : files) {
+      final File arrowFile = new File(Sources.of(file).path());
+      final FileInputStream fileInputStream;
+      try {
+        fileInputStream = new FileInputStream(arrowFile);
+      } catch (FileNotFoundException e) {
+        throw Util.toUnchecked(e);
+      }
+      final SeekableReadChannel seekableReadChannel =
+          new SeekableReadChannel(fileInputStream.getChannel());
+      final RootAllocator allocator = new RootAllocator();
+      final ArrowFileReader arrowFileReader =
+          new ArrowFileReader(seekableReadChannel, allocator);
+      final String tableName =
+          trim(file.getName(), ".arrow").toUpperCase(Locale.ROOT);
+      final ArrowTable table =
+          new ArrowTable(null, arrowFileReader);
+      tables.put(tableName, table);
+    }
+
+    return ImmutableMap.copyOf(tables);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java
new file mode 100644
index 0000000..07810c9
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Factory that creates an {@link ArrowSchema}.
+ */
+public class ArrowSchemaFactory implements SchemaFactory {
+
+  @Override public Schema create(SchemaPlus parentSchema, String name,
+      Map<String, Object> operand) {
+    final File baseDirectory =
+        (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
+    final String directory = (String) operand.get("directory");
+    File directoryFile = null;
+    if (directory != null) {
+      directoryFile = new File(directory);
+    }
+    if (baseDirectory != null) {
+      if (directoryFile == null) {
+        directoryFile = baseDirectory;
+      } else if (!directoryFile.isAbsolute()) {
+        directoryFile = new File(baseDirectory, directoryFile.getPath());
+      }
+    }
+    if (directoryFile == null) {
+      throw new RuntimeException("no directory");
+    }
+    return new ArrowSchema(directoryFile);
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
new file mode 100644
index 0000000..f868756
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Filter;
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.gandiva.expression.Condition;
+import org.apache.arrow.gandiva.expression.ExpressionTree;
+import org.apache.arrow.gandiva.expression.TreeBuilder;
+import org.apache.arrow.gandiva.expression.TreeNode;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Arrow Table.
+ */
+public class ArrowTable extends AbstractTable
+    implements TranslatableTable, QueryableTable {
+  private final @Nullable RelProtoDataType protoRowType;
+  /** Arrow schema. (In Calcite terminology, more like a row type than a Schema.) */
+  private final Schema schema;
+  private final ArrowFileReader arrowFileReader;
+
+  ArrowTable(@Nullable RelProtoDataType protoRowType, ArrowFileReader arrowFileReader) {
+    try {
+      this.schema = arrowFileReader.getVectorSchemaRoot().getSchema();
+    } catch (IOException e) {
+      throw Util.toUnchecked(e);
+    }
+    this.protoRowType = protoRowType;
+    this.arrowFileReader = arrowFileReader;
+  }
+
+  @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    if (this.protoRowType != null) {
+      return this.protoRowType.apply(typeFactory);
+    }
+    return deduceRowType(this.schema, (JavaTypeFactory) typeFactory);
+  }
+
+  @Override public Expression getExpression(SchemaPlus schema, String tableName,
+      Class clazz) {
+    return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
+  }
+
+  /** Called via code generation; see uses of
+   * {@link org.apache.calcite.adapter.arrow.ArrowMethod#ARROW_QUERY}. */
+  @SuppressWarnings("unused")
+  public Enumerable<Object> query(DataContext root, ImmutableIntList fields,
+      List<String> conditions) {
+    requireNonNull(fields, "fields");
+    final Projector projector;
+    final Filter filter;
+
+    if (conditions.isEmpty()) {
+      filter = null;
+
+      final List<ExpressionTree> expressionTrees = new ArrayList<>();
+      for (int fieldOrdinal : fields) {
+        Field field = schema.getFields().get(fieldOrdinal);
+        TreeNode node = TreeBuilder.makeField(field);
+        expressionTrees.add(TreeBuilder.makeExpression(node, field));
+      }
+      try {
+        projector = Projector.make(schema, expressionTrees);
+      } catch (GandivaException e) {
+        throw Util.toUnchecked(e);
+      }
+    } else {
+      projector = null;
+
+      final List<TreeNode> conditionNodes = new ArrayList<>(conditions.size());
+      for (String condition : conditions) {
+        String[] data = condition.split(" ");
+        List<TreeNode> treeNodes = new ArrayList<>(2);
+        treeNodes.add(
+            TreeBuilder.makeField(schema.getFields()
+                .get(schema.getFields().indexOf(schema.findField(data[0])))));
+        treeNodes.add(makeLiteralNode(data[2], data[3]));
+        String equality = data[1];
+        conditionNodes.add(
+            TreeBuilder.makeFunction(equality, treeNodes, new ArrowType.Bool()));
+      }
+      final Condition filterCondition;
+      if (conditionNodes.size() == 1) {
+        filterCondition = TreeBuilder.makeCondition(conditionNodes.get(0));
+      } else {
+        TreeNode treeNode = TreeBuilder.makeAnd(conditionNodes);
+        filterCondition = TreeBuilder.makeCondition(treeNode);
+      }
+
+      try {
+        filter = Filter.make(schema, filterCondition);
+      } catch (GandivaException e) {
+        throw Util.toUnchecked(e);
+      }
+    }
+
+    return new ArrowEnumerable(arrowFileReader, fields, projector, filter);
+  }
+
+  @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+      SchemaPlus schema, String tableName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override public Type getElementType() {
+    return Object[].class;
+  }
+
+  @Override public RelNode toRel(RelOptTable.ToRelContext context,
+      RelOptTable relOptTable) {
+    final int fieldCount = relOptTable.getRowType().getFieldCount();
+    final ImmutableIntList fields =
+        ImmutableIntList.copyOf(Util.range(fieldCount));
+    final RelOptCluster cluster = context.getCluster();
+    return new ArrowTableScan(cluster, cluster.traitSetOf(ArrowRel.CONVENTION),
+        relOptTable, this, fields);
+  }
+
+  private static RelDataType deduceRowType(Schema schema,
+      JavaTypeFactory typeFactory) {
+    final RelDataTypeFactory.Builder builder = typeFactory.builder();
+    for (Field field : schema.getFields()) {
+      builder.add(field.getName(),
+          ArrowFieldType.of(field.getType()).toType(typeFactory));
+    }
+    return builder.build();
+  }
+
+  private static TreeNode makeLiteralNode(String literal, String type) {
+    switch (type) {
+    case "integer":
+      return TreeBuilder.makeLiteral(Integer.parseInt(literal));
+    case "long":
+      return TreeBuilder.makeLiteral(Long.parseLong(literal));
+    case "float":
+      return TreeBuilder.makeLiteral(Float.parseFloat(literal));
+    case "double":
+      return TreeBuilder.makeLiteral(Double.parseDouble(literal));
+    case "string":
+      return TreeBuilder.makeStringLiteral(literal.substring(1, literal.length() - 1));
+    default:
+      throw new IllegalArgumentException("Invalid literal " + literal
+          + ", type " + type);
+    }
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java
new file mode 100644
index 0000000..651a26a
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableIntList;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Relational expression representing a scan of an Arrow collection.
+ */
+class ArrowTableScan extends TableScan implements ArrowRel {
+  private final ArrowTable arrowTable;
+  private final ImmutableIntList fields;
+
+  ArrowTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+      RelOptTable relOptTable, ArrowTable arrowTable, ImmutableIntList fields) {
+    super(cluster, traitSet, ImmutableList.of(), relOptTable);
+    this.arrowTable = arrowTable;
+    this.fields = fields;
+
+    assert getConvention() == ArrowRel.CONVENTION;
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    checkArgument(inputs.isEmpty());
+    return this;
+  }
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("fields", fields);
+  }
+
+  @Override public RelDataType deriveRowType() {
+    final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
+    final RelDataTypeFactory.Builder builder =
+        getCluster().getTypeFactory().builder();
+    for (int field : fields) {
+      builder.add(fieldList.get(field));
+    }
+    return builder.build();
+  }
+
+  @Override public void register(RelOptPlanner planner) {
+    planner.addRule(ArrowRules.TO_ENUMERABLE);
+    for (RelOptRule rule : ArrowRules.RULES) {
+      planner.addRule(rule);
+    }
+  }
+
+  @Override public void implement(ArrowRel.Implementor implementor) {
+    implementor.arrowTable = arrowTable;
+    implementor.table = table;
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java
new file mode 100644
index 0000000..3b90dfd
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BuiltInMethod;
+
+import com.google.common.primitives.Ints;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Relational expression representing a scan of a table in an Arrow data source.
+ */
+class ArrowToEnumerableConverter
+    extends ConverterImpl implements EnumerableRel {
+
+  protected ArrowToEnumerableConverter(RelOptCluster cluster,
+      RelTraitSet traitSet, RelNode input) {
+    super(cluster, ConventionTraitDef.INSTANCE, traitSet, input);
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new ArrowToEnumerableConverter(getCluster(), traitSet, sole(inputs));
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    RelOptCost cost = super.computeSelfCost(planner, mq);
+    return requireNonNull(cost, "cost").multiplyBy(0.1);
+  }
+
+  @Override public Result implement(EnumerableRelImplementor implementor,
+      Prefer pref) {
+    final ArrowRel.Implementor arrowImplementor = new ArrowRel.Implementor();
+    arrowImplementor.visitInput(0, getInput());
+    PhysType physType =
+        PhysTypeImpl.of(
+            implementor.getTypeFactory(),
+            getRowType(),
+            pref.preferArray());
+
+    final RelOptTable table = requireNonNull(arrowImplementor.table, "table");
+    final int fieldCount = table.getRowType().getFieldCount();
+    return implementor.result(physType,
+        Blocks.toBlock(
+            Expressions.call(table.getExpression(ArrowTable.class),
+                ArrowMethod.ARROW_QUERY.method, implementor.getRootExpression(),
+                arrowImplementor.selectFields != null
+                    ? Expressions.call(
+                        BuiltInMethod.IMMUTABLE_INT_LIST_COPY_OF.method,
+                        Expressions.constant(
+                            Ints.toArray(arrowImplementor.selectFields)))
+                    : Expressions.call(
+                        BuiltInMethod.IMMUTABLE_INT_LIST_IDENTITY.method,
+                        Expressions.constant(fieldCount)),
+                Expressions.constant(arrowImplementor.whereClause))));
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java
new file mode 100644
index 0000000..5b68a13
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.DateString;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.calcite.util.DateTimeStringUtils.ISO_DATETIME_FRACTIONAL_SECOND_FORMAT;
+import static org.apache.calcite.util.DateTimeStringUtils.getDateFormatter;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Translates a {@link RexNode} expression to a Gandiva string.
+ */
+class ArrowTranslator {
+  final RexBuilder rexBuilder;
+  final RelDataType rowType;
+  final List<String> fieldNames;
+
+  /** Private constructor. */
+  ArrowTranslator(RexBuilder rexBuilder, RelDataType rowType) {
+    this.rexBuilder = rexBuilder;
+    this.rowType = rowType;
+    this.fieldNames = ArrowRules.arrowFieldNames(rowType);
+  }
+
+  /** Creates an ArrowTranslator. */
+  public static ArrowTranslator create(RexBuilder rexBuilder,
+      RelDataType rowType) {
+    return new ArrowTranslator(rexBuilder, rowType);
+  }
+
+  List<String> translateMatch(RexNode condition) {
+    List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
+    if (disjunctions.size() == 1) {
+      return translateAnd(disjunctions.get(0));
+    } else {
+      throw new AssertionError("cannot translate " + condition);
+    }
+  }
+
+  /**
+   * Returns the value of the literal.
+   *
+   * @param literal Literal to translate
+   *
+   * @return The value of the literal in the form of the actual type
+   */
+  private static Object literalValue(RexLiteral literal) {
+    switch (literal.getTypeName()) {
+    case TIMESTAMP:
+    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+      final SimpleDateFormat dateFormatter =
+          getDateFormatter(ISO_DATETIME_FRACTIONAL_SECOND_FORMAT);
+      Long millis = literal.getValueAs(Long.class);
+      return dateFormatter.format(requireNonNull(millis, "millis"));
+    case DATE:
+      final DateString dateString = literal.getValueAs(DateString.class);
+      return requireNonNull(dateString, "dateString").toString();
+    default:
+      return requireNonNull(literal.getValue3());
+    }
+  }
+
+  /**
+   * Translate a conjunctive predicate to a SQL string.
+   *
+   * @param condition A conjunctive predicate
+   *
+   * @return SQL string for the predicate
+   */
+  private List<String> translateAnd(RexNode condition) {
+    List<String> predicates = new ArrayList<>();
+    for (RexNode node : RelOptUtil.conjunctions(condition)) {
+      if (node.getKind() == SqlKind.SEARCH) {
+        final RexNode node2 = RexUtil.expandSearch(rexBuilder, null, node);
+        predicates.addAll(translateMatch(node2));
+      } else {
+        predicates.add(translateMatch2(node));
+      }
+    }
+    return predicates;
+  }
+
+  /** Translate a binary relation. */
+  private String translateMatch2(RexNode node) {
+    switch (node.getKind()) {
+    case EQUALS:
+      return translateBinary("equal", "=", (RexCall) node);
+    case LESS_THAN:
+      return translateBinary("less_than", ">", (RexCall) node);
+    case LESS_THAN_OR_EQUAL:
+      return translateBinary("less_than_or_equal_to", ">=", (RexCall) node);
+    case GREATER_THAN:
+      return translateBinary("greater_than", "<", (RexCall) node);
+    case GREATER_THAN_OR_EQUAL:
+      return translateBinary("greater_than_or_equal_to", "<=", (RexCall) node);
+    default:
+      throw new AssertionError("cannot translate " + node);
+    }
+  }
+
+  /**
+   * Translates a call to a binary operator, reversing arguments if
+   * necessary.
+   */
+  private String translateBinary(String op, String rop, RexCall call) {
+    final RexNode left = call.operands.get(0);
+    final RexNode right = call.operands.get(1);
+    @Nullable String expression = translateBinary2(op, left, right);
+    if (expression != null) {
+      return expression;
+    }
+    expression = translateBinary2(rop, right, left);
+    if (expression != null) {
+      return expression;
+    }
+    throw new AssertionError("cannot translate op " + op + " call " + call);
+  }
+
+  /** Translates a call to a binary operator. Returns null on failure. */
+  private @Nullable String translateBinary2(String op, RexNode left, RexNode right) {
+    if (right.getKind() != SqlKind.LITERAL) {
+      return null;
+    }
+    final RexLiteral rightLiteral = (RexLiteral) right;
+    switch (left.getKind()) {
+    case INPUT_REF:
+      final RexInputRef left1 = (RexInputRef) left;
+      String name = fieldNames.get(left1.getIndex());
+      return translateOp2(op, name, rightLiteral);
+    case CAST:
+      // FIXME This will not work in all cases (for example, we ignore string encoding)
+      return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+    default:
+      return null;
+    }
+  }
+
+  /** Combines a field name, operator, and literal to produce a predicate string. */
+  private String translateOp2(String op, String name, RexLiteral right) {
+    Object value = literalValue(right);
+    String valueString = value.toString();
+    String valueType = getLiteralType(value);
+
+    if (value instanceof String) {
+      final RelDataTypeField field = requireNonNull(rowType.getField(name, true, false), "field");
+      SqlTypeName typeName = field.getType().getSqlTypeName();
+      if (typeName != SqlTypeName.CHAR) {
+        valueString = "'" + valueString + "'";
+      }
+    }
+    return name + " " + op + " " + valueString + " " + valueType;
+  }
+
+  private static String getLiteralType(Object literal) {
+    if (literal instanceof BigDecimal) {
+      BigDecimal bigDecimalLiteral = (BigDecimal) literal;
+      int scale = bigDecimalLiteral.scale();
+      if (scale == 0) {
+        return "integer";
+      } else if (scale > 0) {
+        return "float";
+      }
+    } else if (String.class.equals(literal.getClass())) {
+      return "string";
+    }
+    throw new AssertionError("Invalid literal");
+  }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java
new file mode 100644
index 0000000..51ced6b
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Query provider that reads from Arrow files.
+ */
+package org.apache.calcite.adapter.arrow;
diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
new file mode 100644
index 0000000..19e7bdf
--- /dev/null
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.util.Sources;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for the Apache Arrow adapter.
+ */
+@Execution(ExecutionMode.SAME_THREAD)
+@ExtendWith(ArrowExtension.class)
+class ArrowAdapterTest {
+  private static Map<String, String> arrow;
+  private static File arrowDataDirectory;
+
+  @BeforeAll
+  static void initializeArrowState(@TempDir Path sharedTempDir) throws IOException, SQLException {
+    URL modelUrl =
+        Objects.requireNonNull(ArrowAdapterTest.class.getResource("/arrow-model.json"), "url");
+    Path sourceModelFilePath = Sources.of(modelUrl).file().toPath();
+    Path modelFileTarget = sharedTempDir.resolve("arrow-model.json");
+    Files.copy(sourceModelFilePath, modelFileTarget);
+
+    Path arrowFilesDirectory = sharedTempDir.resolve("arrow");
+    Files.createDirectory(arrowFilesDirectory);
+    arrowDataDirectory = arrowFilesDirectory.toFile();
+
+    File dataLocationFile = arrowFilesDirectory.resolve("arrowdata.arrow").toFile();
+    ArrowData arrowDataGenerator = new ArrowData();
+    arrowDataGenerator.writeArrowData(dataLocationFile);
+    arrowDataGenerator.writeScottEmpData(arrowFilesDirectory);
+
+    arrow = ImmutableMap.of("model", modelFileTarget.toAbsolutePath().toString());
+  }
+
+  /** Test to read an Arrow file and check its field names. */
+  @Test void testArrowSchema() {
+    ArrowSchema arrowSchema = new ArrowSchema(arrowDataDirectory);
+    Map<String, Table> tableMap = arrowSchema.getTableMap();
+    RelDataTypeFactory typeFactory =
+        new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    RelDataType relDataType = tableMap.get("ARROWDATA").getRowType(typeFactory);
+
+    assertEquals(relDataType.getFieldNames(),
+        ImmutableList.of("intField", "stringField", "floatField", "longField"));
+  }
+
+  @Test void testArrowProjectAllFields() {
+    String sql = "select * from arrowdata\n";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0; stringField=0; floatField=0.0; longField=0\n"
+        + "intField=1; stringField=1; floatField=1.0; longField=1\n"
+        + "intField=2; stringField=2; floatField=2.0; longField=2\n"
+        + "intField=3; stringField=3; floatField=3.0; longField=3\n"
+        + "intField=4; stringField=4; floatField=4.0; longField=4\n"
+        + "intField=5; stringField=5; floatField=5.0; longField=5\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(6)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectAllFieldsExplicitly() {
+    String sql = "select \"intField\", \"stringField\", \"floatField\", \"longField\" "
+        + "from arrowdata\n";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0; stringField=0; floatField=0.0; longField=0\n"
+        + "intField=1; stringField=1; floatField=1.0; longField=1\n"
+        + "intField=2; stringField=2; floatField=2.0; longField=2\n"
+        + "intField=3; stringField=3; floatField=3.0; longField=3\n"
+        + "intField=4; stringField=4; floatField=4.0; longField=4\n"
+        + "intField=5; stringField=5; floatField=5.0; longField=5\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(6)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectAllFieldsExplicitlyPermutation() {
+    String sql = "select \"stringField\", \"intField\", \"longField\", \"floatField\" "
+        + "from arrowdata\n";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(stringField=[$1], intField=[$0], longField=[$3], floatField=[$2])\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "stringField=0; intField=0; longField=0; floatField=0.0\n"
+        + "stringField=1; intField=1; longField=1; floatField=1.0\n"
+        + "stringField=2; intField=2; longField=2; floatField=2.0\n"
+        + "stringField=3; intField=3; longField=3; floatField=3.0\n"
+        + "stringField=4; intField=4; longField=4; floatField=4.0\n"
+        + "stringField=5; intField=5; longField=5; floatField=5.0\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(6)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectSingleField() {
+    String sql = "select \"intField\" from arrowdata\n";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0])\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0\nintField=1\nintField=2\n"
+        + "intField=3\nintField=4\nintField=5\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(6)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectTwoFields() {
+    String sql = "select \"intField\", \"stringField\" from arrowdata\n";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0; stringField=0\n"
+        + "intField=1; stringField=1\n"
+        + "intField=2; stringField=2\n"
+        + "intField=3; stringField=3\n"
+        + "intField=4; stringField=4\n"
+        + "intField=5; stringField=5\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(6)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectFieldsWithIntegerFilter() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" < 4";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[<($0, 4)])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0; stringField=0\n"
+        + "intField=1; stringField=1\n"
+        + "intField=2; stringField=2\n"
+        + "intField=3; stringField=3\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectFieldsWithMultipleFilterSameField() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" > 1 and \"intField\" < 4";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[SEARCH($0, Sarg[(1..4)])])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=2; stringField=2\n"
+        + "intField=3; stringField=3\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectFieldsWithConjunctiveFilters() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\"=12 and \"stringField\"='12'";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[AND(=($0, 12), =($1, '12'))])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=12; stringField=12\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Disabled("OR is not supported yet")
+  @Test void testArrowProjectFieldsWithDisjunctiveFilter() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\"=12 or \"stringField\"='12'";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[OR(=($0, 12), =($1, '12'))])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=12; stringField=12\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Disabled("IN is not supported as OR is not supported yet")
+  @Test void testArrowProjectFieldsWithInFilter() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" in (0, 1, 2)";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[OR(=($0, 0), =($0, 1), =($0, 2))])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0; stringField=0\n"
+        + "intField=1; stringField=1\n"
+        + "intField=2; stringField=2\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Disabled("IS NOT NULL is not supported yet")
+  @Test void testArrowProjectFieldsWithIsNotNullFilter() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" is not null\n"
+        + "order by \"intField\"\n"
+        + "limit 1";
+    String plan = "PLAN=EnumerableLimit(fetch=[1])\n"
+        + "  EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+        + "    ArrowToEnumerableConverter\n"
+        + "      ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "        ArrowFilter(condition=[IS NOT NULL($0)])\n"
+        + "          ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0; stringField=0\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Disabled("IS NULL is not supported yet")
+  @Test void testArrowProjectFieldsWithIsNullFilter() {
+    String sql = "select \"intField\", \"stringField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" is null";
+    String plan = "ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[IS NOT NULL($0)])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returnsCount(0)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectFieldsWithFloatFilter() {
+    String sql = "select * from arrowdata\n"
+        + " where \"floatField\"=15.0";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowFilter(condition=[=(CAST($2):DOUBLE, 15.0)])\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=15; stringField=15; floatField=15.0; longField=15\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowProjectFieldsWithFilterOnLaterBatch() {
+    String sql = "select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\"=25";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0])\n"
+        + "    ArrowFilter(condition=[=($0, 25)])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=25\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowSubquery() {
+    String sql = "select \"intField\"\n"
+        + "from (select \"intField\", \"stringField\" from arrowdata where \"stringField\" = '2')\n"
+        + "where \"intField\" = 2";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0])\n"
+        + "    ArrowFilter(condition=[AND(=($1, '2'), =($0, 2))])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n"
+        + "\n";
+    String result = "intField=2\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Disabled("UNION does not work")
+  @Test void testArrowUnion() {
+    String sql = "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 2)\n"
+        + "  union \n"
+        + "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 1)\n";
+    String plan = "PLAN=EnumerableUnion(all=[false])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowProject(intField=[$0])\n"
+        + "      ArrowFilter(condition=[=($0, 2)])\n"
+        + "        ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowProject(intField=[$0])\n"
+        + "      ArrowFilter(condition=[=($0, 1)])\n"
+        + "        ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=1\nintField=2\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testFieldWithSpace() {
+    String sql = "select \"my Field\" from (select \"intField\", \"stringField\" as \"my Field\"\n"
+        + "from arrowdata)\n"
+        + "where \"my Field\" = '2'";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(my Field=[$1])\n"
+        + "    ArrowFilter(condition=[=($1, '2')])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "my Field=2\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Disabled("literal with space is not supported")
+  @Test void testLiteralWithSpace() {
+    String sql = "select \"intField\", \"stringField\" as \"my Field\"\n"
+        + "from arrowdata\n"
+        + "where \"stringField\" = 'literal with space'";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], my Field=[$1])\n"
+        + "    ArrowFilter(condition=[=($1, '2')])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testLiteralWithQuote() {
+    String sql = "select \"intField\", \"stringField\" as \"my Field\"\n"
+        + "from arrowdata\n"
+        + "where \"stringField\" = ''''";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(intField=[$0], stringField=[$1])\n"
+        + "    ArrowFilter(condition=[=($1, '''')])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testTinyIntProject() {
+    String sql = "select DEPTNO from DEPT";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(DEPTNO=[$0])\n"
+        + "    ArrowTableScan(table=[[ARROW, DEPT]], fields=[[0, 1, 2]])\n\n";
+    String result = "DEPTNO=10\nDEPTNO=20\nDEPTNO=30\nDEPTNO=40\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testSmallIntProject() {
+    String sql = "select EMPNO from EMP";
+    String plan = "PLAN=ArrowToEnumerableConverter\n"
+        + "  ArrowProject(EMPNO=[$0])\n"
+        + "    ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+    String result = "EMPNO=7369\nEMPNO=7499\nEMPNO=7521\n";
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(3)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testCastDecimalToInt() {
+    String sql = "select CAST(LOSAL AS INT) as \"trunc\" from SALGRADE";
+    String plan =
+        "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):INTEGER], trunc=[$t3])\n"
+            + "  ArrowToEnumerableConverter\n"
+            + "    ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n";
+    String result = "trunc=700\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .typeIs("[trunc INTEGER]")
+        .limit(1)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testCastDecimalToFloat() {
+    String sql = "select CAST(LOSAL AS FLOAT) as \"extra\" from SALGRADE";
+    String plan = "PLAN=EnumerableCalc(expr#0..2=[{inputs}],"
+        + " expr#3=[CAST($t1):FLOAT], extra=[$t3])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n";
+    String result = "extra=700.0\nextra=1201.0\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .typeIs("[extra FLOAT]")
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testCastDecimalToDouble() {
+    String sql = "select CAST(LOSAL AS DOUBLE) as \"extra\" from SALGRADE";
+    String plan =
+        "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):DOUBLE], extra=[$t3])\n"
+            + "  ArrowToEnumerableConverter\n"
+            + "    ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n";
+    String result = "extra=700.0\nextra=1201.0\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .typeIs("[extra DOUBLE]")
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testCastIntToDouble() {
+    String sql = "select CAST(\"intField\" AS DOUBLE) as \"dbl\" from arrowdata";
+    String plan =
+        "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t0):DOUBLE], dbl=[$t4])\n"
+            + "  ArrowToEnumerableConverter\n"
+            + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "dbl=0.0\ndbl=1.0\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .typeIs("[dbl DOUBLE]")
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testStringOperation() {
+    String sql = "select\n"
+        + "  \"stringField\" || '_suffix' as \"field1\"\n"
+        + "from arrowdata";
+    String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=['_suffix'], "
+            + "expr#5=[||($t1, $t4)], field1=[$t5])\n"
+            + "  ArrowToEnumerableConverter\n"
+            + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "field1=0_suffix\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(1)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+
+  @Disabled("join is not supported yet")
+  @Test void testJoin() {
+    String sql = "select t1.\"intField\", t2.\"intField\" "
+        + "from arrowdata t1 join arrowdata t2 on t1.\"intField\" = t2.\"intField\"";
+    String plan = "PLAN=EnumerableJoin(condition=[=($0, $4)], joinType=[inner])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0\nintField=1\nintField=2\nintField=3\nintField=4\nintField=5\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(1)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testAggWithoutAggFunctions() {
+    String sql = "select DISTINCT(\"intField\") as \"dep\" from arrowdata";
+    String plan = "PLAN=EnumerableAggregate(group=[{0}])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "dep=0\ndep=1\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testAggWithAggFunctions() {
+    String sql = "select JOB, SUM(SAL) as TOTAL from EMP GROUP BY JOB";
+    String plan = "PLAN=EnumerableAggregate(group=[{2}], TOTAL=[SUM($5)])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+    String result = "JOB=SALESMAN; TOTAL=5600.00\nJOB=ANALYST; TOTAL=6000.00\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testFilteredAgg() {
+    String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP";
+    String plan = "PLAN=EnumerableAggregate(group=[{}], SALESSUM=[SUM($0) FILTER $1])\n"
+        + "  EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], "
+        + "expr#10=[IS TRUE($t9)], SAL=[$t5], $f1=[$t10])\n"
+        + "    ArrowToEnumerableConverter\n"
+        + "      ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+    String result = "SALESSUM=2500.00\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testFilteredAggGroupBy() {
+    String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP group by EMPNO";
+    String plan = "PLAN=EnumerableCalc(expr#0..1=[{inputs}], SALESSUM=[$t1])\n"
+        + "  EnumerableAggregate(group=[{0}], SALESSUM=[SUM($1) FILTER $2])\n"
+        + "    EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], "
+        + "expr#10=[IS TRUE($t9)], EMPNO=[$t0], SAL=[$t5], $f2=[$t10])\n"
+        + "      ArrowToEnumerableConverter\n"
+        + "        ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+    String result = "SALESSUM=1250.00\nSALESSUM=null\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testAggGroupedByNullable() {
+    String sql = "select COMM, SUM(SAL) as SALESSUM from EMP GROUP BY COMM";
+    String plan = "PLAN=EnumerableAggregate(group=[{6}], SALESSUM=[SUM($5)])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returnsUnordered("COMM=0.00; SALESSUM=1500.00",
+            "COMM=1400.00; SALESSUM=1250.00",
+            "COMM=300.00; SALESSUM=1600.00",
+            "COMM=500.00; SALESSUM=1250.00",
+            "COMM=null; SALESSUM=23425.00")
+        .explainContains(plan);
+  }
+
+  @Test void testArrowAdapterLimitNoSort() {
+    String sql = "select \"intField\"\n"
+        + "from arrowdata\n"
+        + "limit 2";
+    String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n"
+        + "  EnumerableLimit(fetch=[2])\n"
+        + "    ArrowToEnumerableConverter\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=0\nintField=1\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowLimitOffsetNoSort() {
+    String sql = "select \"intField\"\n"
+        + "from arrowdata\n"
+        + "limit 2 offset 2";
+    String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n"
+        + "  EnumerableLimit(offset=[2], fetch=[2])\n"
+        + "    ArrowToEnumerableConverter\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=2\nintField=3\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result)
+        .explainContains(plan);
+  }
+
+  @Test void testArrowSortOnLong() {
+    String sql = "select \"intField\" from arrowdata order by \"longField\" desc";
+    String plan = "PLAN=EnumerableSort(sort0=[$1], dir0=[DESC])\n"
+        + "  ArrowToEnumerableConverter\n"
+        + "    ArrowProject(intField=[$0], longField=[$3])\n"
+        + "      ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+    String result = "intField=49\nintField=48\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .limit(2)
+        .returns(result)
+        .explainContains(plan);
+  }
+}
diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java
new file mode 100644
index 0000000..3870bb2
--- /dev/null
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
+import org.apache.arrow.adapter.jdbc.JdbcToArrow;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FloatingPointVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+
+import com.google.common.collect.ImmutableList;
+
+import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Calendar;
+import java.util.List;
+
+/**
+ * Class that can be used to generate Arrow sample data into a data directory.
+ */
+public class ArrowData {
+
+  private final int batchSize;
+  private final int entries;
+  private int intValue;
+  private int stringValue;
+  private float floatValue;
+  private long longValue;
+
+  public ArrowData() {
+    this.batchSize = 20;
+    this.entries = 50;
+    this.intValue = 0;
+    this.stringValue = 0;
+    this.floatValue = 0;
+    this.longValue = 0;
+  }
+
+  private Schema makeArrowSchema() {
+    ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+    FieldType intType = FieldType.nullable(new ArrowType.Int(32, true));
+    FieldType stringType = FieldType.nullable(new ArrowType.Utf8());
+    FieldType floatType =
+        FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE));
+    FieldType longType = FieldType.nullable(new ArrowType.Int(64, true));
+
+    childrenBuilder.add(new Field("intField", intType, null));
+    childrenBuilder.add(new Field("stringField", stringType, null));
+    childrenBuilder.add(new Field("floatField", floatType, null));
+    childrenBuilder.add(new Field("longField", longType, null));
+
+    return new Schema(childrenBuilder.build(), null);
+  }
+
+  public void writeScottEmpData(Path arrowDataDirectory) throws IOException, SQLException {
+    List<String> tableNames = ImmutableList.of("EMP", "DEPT", "SALGRADE");
+
+    Connection connection =
+        DriverManager.getConnection(ScottHsqldb.URI, ScottHsqldb.USER, ScottHsqldb.PASSWORD);
+
+    for (String tableName : tableNames) {
+      String sql = "SELECT * FROM " + tableName;
+      Statement statement = connection.createStatement();
+      ResultSet resultSet = statement.executeQuery(sql);
+
+      Calendar calendar = JdbcToArrowUtils.getUtcCalendar();
+
+      RootAllocator rootAllocator = new RootAllocator();
+      JdbcToArrowConfig config = new JdbcToArrowConfigBuilder()
+          .setAllocator(rootAllocator)
+          .setReuseVectorSchemaRoot(true)
+          .setCalendar(calendar)
+          .setTargetBatchSize(1024)
+          .build();
+
+      ArrowVectorIterator vectorIterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config);
+      Path tablePath = arrowDataDirectory.resolve(tableName + ".arrow");
+
+      FileOutputStream fileOutputStream = new FileOutputStream(tablePath.toFile());
+
+      VectorSchemaRoot vectorSchemaRoot = vectorIterator.next();
+
+      ArrowFileWriter arrowFileWriter =
+          new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel());
+
+      arrowFileWriter.start();
+      arrowFileWriter.writeBatch();
+
+      while (vectorIterator.hasNext()) {
+        // refreshes the data in the VectorSchemaRoot with the next batch
+        vectorIterator.next();
+        arrowFileWriter.writeBatch();
+      }
+
+      arrowFileWriter.close();
+    }
+  }
+
+  public void writeArrowData(File file) throws IOException {
+    FileOutputStream fileOutputStream = new FileOutputStream(file);
+    Schema arrowSchema = makeArrowSchema();
+    VectorSchemaRoot vectorSchemaRoot =
+        VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE));
+    ArrowFileWriter arrowFileWriter =
+        new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel());
+
+    arrowFileWriter.start();
+
+    for (int i = 0; i < this.entries;) {
+      int numRows = Math.min(this.batchSize, this.entries - i);
+      vectorSchemaRoot.setRowCount(numRows);
+      for (Field field : vectorSchemaRoot.getSchema().getFields()) {
+        FieldVector vector = vectorSchemaRoot.getVector(field.getName());
+        switch (vector.getMinorType()) {
+        case INT:
+          intField(vector, numRows);
+          break;
+        case FLOAT4:
+          floatField(vector, numRows);
+          break;
+        case VARCHAR:
+          varCharField(vector, numRows);
+          break;
+        case BIGINT:
+          longField(vector, numRows);
+          break;
+        default:
+          throw new IllegalStateException("Not supported type yet: " + vector.getMinorType());
+        }
+      }
+      arrowFileWriter.writeBatch();
+      i += numRows;
+    }
+    arrowFileWriter.end();
+    arrowFileWriter.close();
+    fileOutputStream.flush();
+    fileOutputStream.close();
+  }
+
+  private void intField(FieldVector fieldVector, int rowCount) {
+    IntVector intVector = (IntVector) fieldVector;
+    intVector.setInitialCapacity(rowCount);
+    intVector.allocateNew();
+    for (int i = 0; i < rowCount; i++) {
+      intVector.set(i, 1, intValue);
+      this.intValue++;
+    }
+    fieldVector.setValueCount(rowCount);
+  }
+
+  private void floatField(FieldVector fieldVector, int rowCount) {
+    FloatingPointVector floatingPointVector = (FloatingPointVector) fieldVector;
+    floatingPointVector.setInitialCapacity(rowCount);
+    floatingPointVector.allocateNew();
+    for (int i = 0; i < rowCount; i++) {
+      float value = this.floatValue;
+      floatingPointVector.setWithPossibleTruncate(i, value);
+      this.floatValue++;
+    }
+    fieldVector.setValueCount(rowCount);
+  }
+
+  private void varCharField(FieldVector fieldVector, int rowCount) {
+    VarCharVector varCharVector = (VarCharVector) fieldVector;
+    varCharVector.setInitialCapacity(rowCount);
+    varCharVector.allocateNew();
+    for (int i = 0; i < rowCount; i++) {
+      String value = String.valueOf(this.stringValue);
+      varCharVector.set(i, new Text(value));
+      this.stringValue++;
+    }
+    fieldVector.setValueCount(rowCount);
+  }
+
+  private void longField(FieldVector fieldVector, int rowCount) {
+    BigIntVector longVector = (BigIntVector) fieldVector;
+    longVector.setInitialCapacity(rowCount);
+    longVector.allocateNew();
+    for (int i = 0; i < rowCount; i++) {
+      longVector.set(i, this.longValue);
+      this.longValue++;
+    }
+    fieldVector.setValueCount(rowCount);
+  }
+}
diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java
new file mode 100644
index 0000000..e5edb91
--- /dev/null
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.config.CalciteSystemProperty;
+
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.gandiva.expression.ExpressionTree;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JUnit5 extension to handle Arrow tests.
+ *
+ * <p>Tests will be skipped if the Gandiva library cannot be loaded on the given platform.
+ */
+class ArrowExtension implements ExecutionCondition {
+
+  /**
+   * Whether to run this test.
+   *
+   * <p>Enabled by default, unless explicitly disabled from command line
+   * ({@code -Dcalcite.test.arrow=false}) or if Gandiva library, used to implement arrow
+   * filtering/projection, cannot be loaded.
+   *
+   * @return {@code true} if the test is enabled and can run in the current environment,
+   *         {@code false} otherwise
+   */
+  @Override public ConditionEvaluationResult evaluateExecutionCondition(
+      final ExtensionContext context) {
+
+    boolean enabled = CalciteSystemProperty.TEST_ARROW.value();
+    try {
+      Schema emptySchema = new Schema(new ArrayList<>(), null);
+      List<ExpressionTree> expressions = new ArrayList<>();
+      Projector.make(emptySchema, expressions);
+    } catch (GandivaException e) {
+      // this exception comes from using an empty expression,
+      // but the JNI library was loaded properly
+    } catch (UnsatisfiedLinkError e) {
+      enabled = false;
+    }
+
+    if (enabled) {
+      return ConditionEvaluationResult.enabled("Arrow tests enabled");
+    } else {
+      return ConditionEvaluationResult.disabled("Cassandra tests disabled");
+    }
+  }
+}
diff --git a/arrow/src/test/resources/arrow-model.json b/arrow/src/test/resources/arrow-model.json
new file mode 100644
index 0000000..fed2210
--- /dev/null
+++ b/arrow/src/test/resources/arrow-model.json
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "version": "1.0",
+  "defaultSchema": "ARROW",
+  "schemas": [
+    {
+      "name": "ARROW",
+      "type": "custom",
+      "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory",
+      "operand": {
+        "directory": "arrow"
+      }
+    }
+  ]
+}
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 79fb7cc..3cc3323 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -97,6 +97,10 @@
         apiv("net.java.dev.jna:jna")
         apiv("net.java.dev.jna:jna-platform")
         apiv("net.sf.opencsv:opencsv")
+        apiv("org.apache.arrow:arrow-memory-netty", "arrow")
+        apiv("org.apache.arrow:arrow-vector", "arrow")
+        apiv("org.apache.arrow:arrow-jdbc", "arrow")
+        apiv("org.apache.arrow.gandiva:arrow-gandiva", "arrow-gandiva")
         apiv("org.apache.calcite.avatica:avatica-core", "calcite.avatica")
         apiv("org.apache.calcite.avatica:avatica-server", "calcite.avatica")
         apiv("org.apache.cassandra:cassandra-all")
diff --git a/build.gradle.kts b/build.gradle.kts
index 3ad31fe..579d4bd 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -212,7 +212,7 @@
 }
 
 val adaptersForSqlline = listOf(
-    ":babel", ":cassandra", ":druid", ":elasticsearch",
+    ":arrow", ":babel", ":cassandra", ":druid", ":elasticsearch",
     ":file", ":geode", ":innodb", ":kafka", ":mongodb",
     ":pig", ":piglet", ":plus", ":redis", ":spark", ":splunk")
 
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
index 4db1b30..4d454c6 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
@@ -211,6 +211,12 @@
       });
 
   /**
+   * Whether to run Arrow tests.
+   */
+  public static final CalciteSystemProperty<Boolean> TEST_ARROW =
+      booleanProperty("calcite.test.arrow", true);
+
+  /**
    * Whether to run MongoDB tests.
    */
   public static final CalciteSystemProperty<Boolean> TEST_MONGODB =
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index b98cc91..6280e11 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -342,6 +342,8 @@
   SORTED_MULTI_MAP_ARRAYS(SortedMultiMap.class, "arrays", Comparator.class),
   SORTED_MULTI_MAP_SINGLETON(SortedMultiMap.class, "singletonArrayIterator",
       Comparator.class, List.class),
+  IMMUTABLE_INT_LIST_IDENTITY(ImmutableIntList.class, "identity", int.class),
+  IMMUTABLE_INT_LIST_COPY_OF(ImmutableIntList.class, "copyOf", int[].class),
   BINARY_SEARCH5_LOWER(BinarySearch.class, "lowerBound", Object[].class,
       Object.class, int.class, int.class, Comparator.class),
   BINARY_SEARCH5_UPPER(BinarySearch.class, "upperBound", Object[].class,
diff --git a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java
index cb34fe3..c8448aa 100644
--- a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java
+++ b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java
@@ -70,13 +70,25 @@
    * Creates an ImmutableIntList from an array of {@code int}.
    */
   public static ImmutableIntList of(int... ints) {
+    if (ints.length == 0) {
+      return EMPTY;
+    }
     return new ImmutableIntList(ints.clone());
   }
 
+  /** Same as {@link #of(int...)}, but less ambiguous for code generators
+   * and compilers. */
+  public static ImmutableIntList copyOf(int... ints) {
+    return of(ints);
+  }
+
   /**
    * Creates an ImmutableIntList from an array of {@code Number}.
    */
   public static ImmutableIntList copyOf(Number... numbers) {
+    if (numbers.length == 0) {
+      return EMPTY;
+    }
     final int[] ints = new int[numbers.length];
     for (int i = 0; i < ints.length; i++) {
       ints[i] = numbers[i].intValue();
@@ -108,6 +120,9 @@
 
   private static ImmutableIntList copyFromCollection(
       Collection<? extends Number> list) {
+    if (list.isEmpty()) {
+      return EMPTY;
+    }
     final int[] ints = new int[list.size()];
     int i = 0;
     for (Number number : list) {
@@ -192,7 +207,7 @@
     return ints.clone();
   }
 
-  /** Returns an List of {@code Integer}. */
+  /** Returns a List of {@code Integer}. */
   public List<Integer> toIntegerList() {
     ArrayList<Integer> arrayList = new ArrayList<>(size());
     for (int i : ints) {
@@ -283,6 +298,9 @@
    * @see Mappings#isIdentity(List, int)
    */
   public static ImmutableIntList identity(int count) {
+    if (count == 0) {
+      return EMPTY;
+    }
     final int[] integers = new int[count];
     for (int i = 0; i < integers.length; i++) {
       integers[i] = i;
diff --git a/gradle.properties b/gradle.properties
index 51e13a2..1bf19c1 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -83,6 +83,8 @@
 # elasticsearch does not like asm:6.2.1+
 aggdesigner-algorithm.version=6.0
 apiguardian-api.version=1.1.2
+arrow-gandiva.version=15.0.0
+arrow.version=15.0.0
 asm.version=7.2
 byte-buddy.version=1.9.3
 cassandra-all.version=4.1.3
diff --git a/settings.gradle.kts b/settings.gradle.kts
index a5ea536..adb373f 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -67,6 +67,7 @@
 include(
     "bom",
     "release",
+    "arrow",
     "babel",
     "cassandra",
     "core",
diff --git a/site/_docs/adapter.md b/site/_docs/adapter.md
index 1ceff65..844b4a9 100644
--- a/site/_docs/adapter.md
+++ b/site/_docs/adapter.md
@@ -27,6 +27,7 @@
 A schema adapter allows Calcite to read particular kind of data,
 presenting the data as tables within a schema.
 
+* [Arrow adapter](arrow_adapter.html) (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/arrow/package-summary.html">calcite-arrow</a>)
 * [Cassandra adapter](cassandra_adapter.html) (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/cassandra/package-summary.html">calcite-cassandra</a>)
 * CSV adapter (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/csv/package-summary.html">example/csv</a>)
 * [Druid adapter](druid_adapter.html) (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/druid/package-summary.html">calcite-druid</a>)
diff --git a/site/_docs/arrow_adapter.md b/site/_docs/arrow_adapter.md
new file mode 100644
index 0000000..2dc0a1c
--- /dev/null
+++ b/site/_docs/arrow_adapter.md
@@ -0,0 +1,83 @@
+---
+layout: docs
+title: Arrow adapter
+permalink: /docs/arrow_adapter.html
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+**Note**: Arrow Adapter is an experimental feature;
+changes in public API and usage are expected.
+
+## Overview
+
+Calcite's adapter for Apache Arrow is able to read and process data in Arrow
+format using SQL.
+
+It can read files in Arrow's
+[Feather format](https://arrow.apache.org/docs/python/feather.html)
+(which generally have a `.arrow` suffix) in the same way that the
+[File Adapter](file_adapter.html) can read `.csv` files.
+
+## A simple example
+
+Let's start with a simple example. First, we need a
+[model definition]({{ site.baseurl }}/docs/model.html),
+as follows.
+
+{% highlight json %}
+{
+  "version": "1.0",
+  "defaultSchema": "ARROW",
+  "schemas": [
+    {
+      "name": "ARROW",
+      "type": "custom",
+      "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory",
+      "operand": {
+        "directory": "arrow"
+      }
+    }
+  ]
+}
+{% endhighlight %}
+
+The model file is stored as `arrow/src/test/resources/arrow-model.json`,
+so you can connect via [`sqlline`](https://github.com/julianhyde/sqlline)
+as follows:
+
+{% highlight bash %}
+$ ./sqlline
+sqlline> !connect jdbc:calcite:model=arrow/src/test/resources/arrow-model.json admin admin
+sqlline> select * from arrow.test;
++----------+----------+------------+
+| fieldOne | fieldTwo | fieldThree |
++----------+----------+------------+
+|        1 | abc      |        1.2 |
+|        2 | def      |        3.4 |
+|        3 | xyz      |        5.6 |
+|        4 | abcd     |       1.22 |
+|        5 | defg     |       3.45 |
+|        6 | xyza     |       5.67 |
++----------+----------+------------+
+6 rows selected
+{% endhighlight %}
+
+The `arrow` directory contains a file called `test.arrow`, and so it shows up as
+a table called `test`.