[CALCITE-2040] Create adapter for Apache Arrow
Co-authored-by: Alessandro Solimando <alessandro.solimando@gmail.com>
Co-authored-by: Jonathan Swenson <jonathan@exploreomni.com>
Co-authored-by: Julian Hyde <jhyde@apache.org>
Co-authored-by: Karshit Shah <shahkarshit@yahoo.co.in>
Co-authored-by: Michael Mior <mmior@cs.rit.edu>
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 4c8a548..059a2b3 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -69,7 +69,9 @@
with:
job-id: jdk${{ matrix.jdk }}
remote-build-cache-proxy-enabled: false
- arguments: --scan --no-parallel --no-daemon build javadoc
+ # Arrow build is excluded because it is not supported on Windows
+ # See https://arrow.apache.org/docs/java/install.html#system-compatibility
+ arguments: --scan --no-parallel --no-daemon build javadoc --exclude-task :arrow:build
- name: 'sqlline and sqllsh'
shell: cmd
run: |
@@ -103,7 +105,9 @@
with:
job-id: jdk${{ matrix.jdk }}
remote-build-cache-proxy-enabled: false
- arguments: --scan --no-parallel --no-daemon build
+ # Arrow build is excluded because it is not supported on Windows
+ # See https://arrow.apache.org/docs/java/install.html#system-compatibility
+ arguments: --scan --no-parallel --no-daemon build --exclude-task :arrow:build
- name: 'sqlline and sqllsh'
shell: cmd
run: |
@@ -215,6 +219,9 @@
S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+ # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+ _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED
with:
job-id: jdk${{ matrix.jdk }}
remote-build-cache-proxy-enabled: false
@@ -241,6 +248,9 @@
S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+ # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+ _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED
with:
job-id: jdk${{ matrix.jdk }}
remote-build-cache-proxy-enabled: false
@@ -310,6 +320,9 @@
S3_BUILD_CACHE_ACCESS_KEY_ID: ${{ secrets.S3_BUILD_CACHE_ACCESS_KEY_ID }}
S3_BUILD_CACHE_SECRET_KEY: ${{ secrets.S3_BUILD_CACHE_SECRET_KEY }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ # The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+ # to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+ _JAVA_OPTIONS: ${{ env._JAVA_OPTIONS }} --add-opens=java.base/java.nio=ALL-UNNAMED
with:
job-id: jdk19
remote-build-cache-proxy-enabled: false
diff --git a/Jenkinsfile b/Jenkinsfile
index 53cc1e6..bd4216c 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -41,7 +41,9 @@
}
stage('Code Quality') {
timeout(time: 1, unit: 'HOURS') {
- withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17"]) {
+ // The following option `--add-opens=java.base/java.nio=ALL-UNNAMED` is required jdk17+
+ // to avoid error. See https://arrow.apache.org/docs/java/install.html#java-compatibility
+ withEnv(["Path+JDK=$JAVA_JDK_17/bin","JAVA_HOME=$JAVA_JDK_17","_JAVA_OPTIONS=--add-opens=java.base/java.nio=ALL-UNNAMED"]) {
withCredentials([string(credentialsId: 'SONARCLOUD_TOKEN', variable: 'SONAR_TOKEN')]) {
if ( env.BRANCH_NAME.startsWith("PR-") ) {
sh './gradlew --no-parallel --no-daemon jacocoAggregateTestReport sonar -PenableJacoco -Dsonar.pullrequest.branch=${CHANGE_BRANCH} -Dsonar.pullrequest.base=${CHANGE_TARGET} -Dsonar.pullrequest.key=${CHANGE_ID} -Dsonar.login=${SONAR_TOKEN}'
diff --git a/arrow/build.gradle.kts b/arrow/build.gradle.kts
new file mode 100644
index 0000000..0ed73e8
--- /dev/null
+++ b/arrow/build.gradle.kts
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+dependencies {
+ api(project(":core"))
+
+ implementation("com.google.guava:guava")
+ implementation("org.apache.arrow:arrow-memory-netty")
+ implementation("org.apache.arrow:arrow-vector")
+ implementation("org.apache.arrow.gandiva:arrow-gandiva")
+ annotationProcessor("org.immutables:value")
+ compileOnly("org.immutables:value-annotations")
+
+ testImplementation("org.apache.arrow:arrow-jdbc")
+ testImplementation("net.hydromatic:scott-data-hsqldb")
+ testImplementation("org.apache.commons:commons-lang3")
+ testImplementation(project(":core"))
+ testImplementation(project(":testkit"))
+}
diff --git a/arrow/gradle.properties b/arrow/gradle.properties
new file mode 100644
index 0000000..4d2cfdd
--- /dev/null
+++ b/arrow/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+description=Arrow adapter for Calcite
+artifact.name=Calcite Arrow
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
new file mode 100644
index 0000000..6351dca
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Enumerator that reads from a collection of Arrow value-vectors.
+ */
+abstract class AbstractArrowEnumerator implements Enumerator<Object> {
+ protected final ArrowFileReader arrowFileReader;
+ protected final List<Integer> fields;
+ protected final List<ValueVector> valueVectors;
+ protected int currRowIndex;
+ protected int rowCount;
+
+ AbstractArrowEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields) {
+ this.arrowFileReader = arrowFileReader;
+ this.fields = fields;
+ this.valueVectors = new ArrayList<>(fields.size());
+ this.currRowIndex = -1;
+ }
+
+ abstract void evaluateOperator(ArrowRecordBatch arrowRecordBatch);
+
+ protected void loadNextArrowBatch() {
+ try {
+ final VectorSchemaRoot vsr = arrowFileReader.getVectorSchemaRoot();
+ for (int i : fields) {
+ this.valueVectors.add(vsr.getVector(i));
+ }
+ this.rowCount = vsr.getRowCount();
+ VectorUnloader vectorUnloader = new VectorUnloader(vsr);
+ ArrowRecordBatch arrowRecordBatch = vectorUnloader.getRecordBatch();
+ evaluateOperator(arrowRecordBatch);
+ } catch (IOException e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+
+ @Override public Object current() {
+ if (fields.size() == 1) {
+ return this.valueVectors.get(0).getObject(currRowIndex);
+ }
+ Object[] current = new Object[valueVectors.size()];
+ for (int i = 0; i < valueVectors.size(); i++) {
+ ValueVector vector = this.valueVectors.get(i);
+ current[i] = vector.getObject(currRowIndex);
+ }
+ return current;
+ }
+
+ @Override public void reset() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
new file mode 100644
index 0000000..142f18c
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Filter;
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Enumerable that reads from Arrow value-vectors.
+ */
+class ArrowEnumerable extends AbstractEnumerable<Object> {
+ private final ArrowFileReader arrowFileReader;
+ private final ImmutableIntList fields;
+ private final @Nullable Projector projector;
+ private final @Nullable Filter filter;
+
+
+ ArrowEnumerable(ArrowFileReader arrowFileReader, ImmutableIntList fields,
+ @Nullable Projector projector, @Nullable Filter filter) {
+ this.arrowFileReader = arrowFileReader;
+ this.projector = projector;
+ this.filter = filter;
+ this.fields = fields;
+ }
+
+ @Override public Enumerator<Object> enumerator() {
+ try {
+ if (projector != null) {
+ return new ArrowProjectEnumerator(arrowFileReader, fields, projector);
+ } else if (filter != null) {
+ return new ArrowFilterEnumerator(arrowFileReader, fields, filter);
+ }
+ throw new IllegalArgumentException(
+ "The arrow enumerator must have either a filter or a projection");
+ } catch (Exception e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java
new file mode 100644
index 0000000..4cb6c38
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldType.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.rel.type.RelDataType;
+
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Arrow field type.
+ */
+enum ArrowFieldType {
+ INT(Primitive.INT),
+ BOOLEAN(Primitive.BOOLEAN),
+ STRING(String.class),
+ FLOAT(Primitive.FLOAT),
+ DOUBLE(Primitive.DOUBLE),
+ DATE(Date.class),
+ LIST(List.class),
+ DECIMAL(BigDecimal.class),
+ LONG(Primitive.LONG),
+ BYTE(Primitive.BYTE),
+ SHORT(Primitive.SHORT);
+
+ private final Class<?> clazz;
+
+ ArrowFieldType(Primitive primitive) {
+ this(requireNonNull(primitive.boxClass, "boxClass"));
+ }
+
+ ArrowFieldType(Class<?> clazz) {
+ this.clazz = clazz;
+ }
+
+ public RelDataType toType(JavaTypeFactory typeFactory) {
+ RelDataType javaType = typeFactory.createJavaType(clazz);
+ RelDataType sqlType = typeFactory.createSqlType(javaType.getSqlTypeName());
+ return typeFactory.createTypeWithNullability(sqlType, true);
+ }
+
+ public static ArrowFieldType of(ArrowType arrowType) {
+ switch (arrowType.getTypeID()) {
+ case Int:
+ int bitWidth = ((ArrowType.Int) arrowType).getBitWidth();
+ switch (bitWidth) {
+ case 64:
+ return LONG;
+ case 32:
+ return INT;
+ case 16:
+ return SHORT;
+ case 8:
+ return BYTE;
+ default:
+ throw new IllegalArgumentException("Unsupported Int bit width: " + bitWidth);
+ }
+ case Bool:
+ return BOOLEAN;
+ case Utf8:
+ return STRING;
+ case FloatingPoint:
+ FloatingPointPrecision precision = ((ArrowType.FloatingPoint) arrowType).getPrecision();
+ switch (precision) {
+ case SINGLE:
+ return FLOAT;
+ case DOUBLE:
+ return DOUBLE;
+ default:
+ throw new IllegalArgumentException("Unsupported Floating point precision: " + precision);
+ }
+ case Date:
+ return DATE;
+ case Decimal:
+ return DECIMAL;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + arrowType);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java
new file mode 100644
index 0000000..9774318
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Arrow.
+ */
+class ArrowFilter extends Filter implements ArrowRel {
+ private final List<String> match;
+
+ ArrowFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode condition) {
+ super(cluster, traitSet, input, condition);
+ final ArrowTranslator translator =
+ ArrowTranslator.create(cluster.getRexBuilder(), input.getRowType());
+ this.match = translator.translateMatch(condition);
+
+ assert getConvention() == ArrowRel.CONVENTION;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ final RelOptCost cost = super.computeSelfCost(planner, mq);
+ return requireNonNull(cost, "cost").multiplyBy(0.1);
+ }
+
+ @Override public ArrowFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new ArrowFilter(getCluster(), traitSet, input, condition);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitInput(0, getInput());
+ implementor.addFilters(match);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
new file mode 100644
index 0000000..c52a542
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Filter;
+import org.apache.arrow.gandiva.evaluator.SelectionVector;
+import org.apache.arrow.gandiva.evaluator.SelectionVectorInt16;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+
+/**
+ * Enumerator that reads from a filtered collection of Arrow value-vectors.
+ */
+class ArrowFilterEnumerator extends AbstractArrowEnumerator {
+ private final BufferAllocator allocator;
+ private final Filter filter;
+ private ArrowBuf buf;
+ private SelectionVector selectionVector;
+ private int selectionVectorIndex;
+
+ ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields, Filter filter) {
+ super(arrowFileReader, fields);
+ this.allocator = new RootAllocator(Long.MAX_VALUE);
+ this.filter = filter;
+ }
+
+ @Override void evaluateOperator(ArrowRecordBatch arrowRecordBatch) {
+ try {
+ this.buf = this.allocator.buffer((long) rowCount * 2);
+ this.selectionVector = new SelectionVectorInt16(buf);
+ filter.evaluate(arrowRecordBatch, selectionVector);
+ } catch (GandivaException e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+
+ @Override public boolean moveNext() {
+ if (selectionVector == null
+ || selectionVectorIndex >= selectionVector.getRecordCount()) {
+ boolean hasNextBatch;
+ while (true) {
+ try {
+ hasNextBatch = arrowFileReader.loadNextBatch();
+ } catch (IOException e) {
+ throw Util.toUnchecked(e);
+ }
+ if (hasNextBatch) {
+ selectionVectorIndex = 0;
+ this.valueVectors.clear();
+ loadNextArrowBatch();
+ assert selectionVector != null;
+ if (selectionVectorIndex >= selectionVector.getRecordCount()) {
+ // the "filtered" batch is empty, but there may be more batches to fetch
+ continue;
+ }
+ currRowIndex = selectionVector.getIndex(selectionVectorIndex++);
+ }
+ return hasNextBatch;
+ }
+ } else {
+ currRowIndex = selectionVector.getIndex(selectionVectorIndex++);
+ return true;
+ }
+ }
+
+ @Override public void close() {
+ try {
+ if (buf != null) {
+ buf.close();
+ }
+ filter.close();
+ } catch (GandivaException e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java
new file mode 100644
index 0000000..66b078d
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowMethod.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.util.ImmutableIntList;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Built-in methods in the Arrow adapter.
+ *
+ * @see org.apache.calcite.util.BuiltInMethod
+ */
+@SuppressWarnings("ImmutableEnumChecker")
+enum ArrowMethod {
+ ARROW_QUERY(ArrowTable.class, "query", DataContext.class,
+ ImmutableIntList.class, List.class);
+
+ final Method method;
+
+ static final ImmutableMap<Method, ArrowMethod> MAP;
+
+ static {
+ final ImmutableMap.Builder<Method, ArrowMethod> builder =
+ ImmutableMap.builder();
+ for (ArrowMethod value : ArrowMethod.values()) {
+ builder.put(value.method, value);
+ }
+ MAP = builder.build();
+ }
+
+ /** Defines a method. */
+ ArrowMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
+ this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java
new file mode 100644
index 0000000..8fee390
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProject.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Arrow.
+ */
+class ArrowProject extends Project implements ArrowRel {
+
+ /** Creates an ArrowProject. */
+ ArrowProject(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
+ assert getConvention() == ArrowRel.CONVENTION;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public Project copy(RelTraitSet traitSet, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new ArrowProject(getCluster(), traitSet, input, projects,
+ rowType);
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ final RelOptCost cost = super.computeSelfCost(planner, mq);
+ return requireNonNull(cost, "cost").multiplyBy(0.1);
+ }
+
+ @Override public void implement(Implementor implementor) {
+ implementor.visitInput(0, getInput());
+ List<Integer> projectedFields = getProjectFields(getProjects());
+ if (projectedFields != null) {
+ implementor.addProjectFields(projectedFields);
+ }
+ }
+
+ static @Nullable List<Integer> getProjectFields(List<RexNode> exps) {
+ final List<Integer> fields = new ArrayList<>();
+ for (final RexNode exp : exps) {
+ if (exp instanceof RexInputRef) {
+ fields.add(((RexInputRef) exp).getIndex());
+ } else {
+ return null; // not a simple projection
+ }
+ }
+ return fields;
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
new file mode 100644
index 0000000..2426810
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+
+/**
+ * Enumerator that reads from a projected collection of Arrow value-vectors.
+ */
+class ArrowProjectEnumerator extends AbstractArrowEnumerator {
+ private final Projector projector;
+
+ ArrowProjectEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList fields,
+ Projector projector) {
+ super(arrowFileReader, fields);
+ this.projector = projector;
+ }
+
+ @Override protected void evaluateOperator(ArrowRecordBatch arrowRecordBatch) {
+ try {
+ projector.evaluate(arrowRecordBatch, valueVectors);
+ } catch (GandivaException e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+
+ @Override public boolean moveNext() {
+ if (currRowIndex >= rowCount - 1) {
+ final boolean hasNextBatch;
+ try {
+ hasNextBatch = arrowFileReader.loadNextBatch();
+ } catch (IOException e) {
+ throw Util.toUnchecked(e);
+ }
+ if (hasNextBatch) {
+ currRowIndex = 0;
+ this.valueVectors.clear();
+ loadNextArrowBatch();
+ }
+ return hasNextBatch;
+ } else {
+ currRowIndex++;
+ return true;
+ }
+ }
+
+ @Override public void close() {
+ try {
+ projector.close();
+ } catch (GandivaException e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java
new file mode 100644
index 0000000..5b002bd
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.ImmutableIntList;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Relational expression that uses the Arrow calling convention.
+ */
+public interface ArrowRel extends RelNode {
+ void implement(Implementor implementor);
+
+ /** Calling convention for relational operations that occur in Arrow. */
+ Convention CONVENTION = new Convention.Impl("ARROW", ArrowRel.class);
+
+ /** Callback for the implementation process that converts a tree of
+ * {@link ArrowRel} nodes into a SQL query. */
+ class Implementor {
+ @Nullable List<Integer> selectFields;
+ final List<String> whereClause = new ArrayList<>();
+ @Nullable RelOptTable table;
+ @Nullable ArrowTable arrowTable;
+
+ /** Adds new predicates.
+ *
+ * @param predicates Predicates
+ */
+ void addFilters(List<String> predicates) {
+ whereClause.addAll(predicates);
+ }
+
+ /** Adds newly projected fields.
+ *
+ * @param fields New fields to be projected from a query
+ */
+ void addProjectFields(List<Integer> fields) {
+ selectFields = ImmutableIntList.copyOf(fields);
+ }
+
+ public void visitInput(int ordinal, RelNode input) {
+ checkArgument(ordinal == 0);
+ ((ArrowRel) input).implement(this);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java
new file mode 100644
index 0000000..42e47e8
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowRules.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+import com.google.common.collect.ImmutableList;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+/** Planner rules relating to the Arrow adapter. */
+public class ArrowRules {
+ private ArrowRules() {}
+
+ /** Rule that matches a {@link org.apache.calcite.rel.core.Project} on
+ * an {@link ArrowTableScan} and pushes down projects if possible. */
+ public static final ArrowProjectRule PROJECT_SCAN =
+ ArrowProjectRule.DEFAULT_CONFIG.toRule(ArrowProjectRule.class);
+
+ public static final ArrowFilterRule FILTER_SCAN =
+ ArrowFilterRule.Config.DEFAULT.toRule();
+
+ public static final ConverterRule TO_ENUMERABLE =
+ ArrowToEnumerableConverterRule.DEFAULT_CONFIG
+ .toRule(ArrowToEnumerableConverterRule.class);
+
+ public static final List<RelOptRule> RULES = ImmutableList.of(PROJECT_SCAN, FILTER_SCAN);
+
+ static List<String> arrowFieldNames(final RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(rowType.getFieldNames(),
+ SqlValidatorUtil.EXPR_SUGGESTER, true);
+ }
+
+ /** Base class for planner rules that convert a relational expression to
+ * the Arrow calling convention. */
+ abstract static class ArrowConverterRule extends ConverterRule {
+ ArrowConverterRule(Config config) {
+ super(config);
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.core.Filter} to an
+ * {@link ArrowFilter}.
+ */
+ public static class ArrowFilterRule extends RelRule<ArrowFilterRule.Config> {
+
+ /** Creates an ArrowFilterRule. */
+ protected ArrowFilterRule(Config config) {
+ super(config);
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Filter filter = call.rel(0);
+
+ if (filter.getTraitSet().contains(Convention.NONE)) {
+ final RelNode converted = convert(filter);
+ call.transformTo(converted);
+ }
+ }
+
+ RelNode convert(Filter filter) {
+ final RelTraitSet traitSet =
+ filter.getTraitSet().replace(ArrowRel.CONVENTION);
+ return new ArrowFilter(filter.getCluster(), traitSet,
+ convert(filter.getInput(), ArrowRel.CONVENTION),
+ filter.getCondition());
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable
+ public interface Config extends RelRule.Config {
+ Config DEFAULT = ImmutableConfig.builder()
+ .withOperandSupplier(b0 ->
+ b0.operand(LogicalFilter.class).oneInput(b1 ->
+ b1.operand(ArrowTableScan.class).noInputs()))
+ .build();
+
+ @Override default ArrowFilterRule toRule() {
+ return new ArrowFilterRule(this);
+ }
+ }
+ }
+
+ /**
+ * Planner rule that projects from an {@link ArrowTableScan} just the columns
+ * needed to satisfy a projection. If the projection's expressions are
+ * trivial, the projection is removed.
+ *
+ * @see ArrowRules#PROJECT_SCAN
+ */
+ public static class ArrowProjectRule extends ArrowConverterRule {
+
+ /** Default configuration. */
+ protected static final Config DEFAULT_CONFIG = Config.INSTANCE
+ .withConversion(LogicalProject.class, Convention.NONE,
+ ArrowRel.CONVENTION, "ArrowProjectRule")
+ .withRuleFactory(ArrowProjectRule::new);
+
+ /** Creates an ArrowProjectRule. */
+ protected ArrowProjectRule(Config config) {
+ super(config);
+ }
+
+ @Override public @Nullable RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ @Nullable List<Integer> fields =
+ ArrowProject.getProjectFields(project.getProjects());
+ if (fields == null) {
+ // Project contains expressions more complex than just field references.
+ return null;
+ }
+ final RelTraitSet traitSet =
+ project.getTraitSet().replace(ArrowRel.CONVENTION);
+ return new ArrowProject(project.getCluster(), traitSet,
+ convert(project.getInput(), ArrowRel.CONVENTION),
+ project.getProjects(), project.getRowType());
+ }
+ }
+
+ /**
+ * Rule to convert a relational expression from
+ * {@link ArrowRel#CONVENTION} to {@link EnumerableConvention}.
+ */
+ static class ArrowToEnumerableConverterRule extends ConverterRule {
+
+ /** Default configuration. */
+ public static final Config DEFAULT_CONFIG = Config.INSTANCE
+ .withConversion(RelNode.class, ArrowRel.CONVENTION,
+ EnumerableConvention.INSTANCE, "ArrowToEnumerableConverterRule")
+ .withRuleFactory(ArrowToEnumerableConverterRule::new);
+
+ /** Creates an ArrowToEnumerableConverterRule. */
+ protected ArrowToEnumerableConverterRule(Config config) {
+ super(config);
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+ return new ArrowToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
new file mode 100644
index 0000000..c40cf44
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.util.Sources;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.SeekableReadChannel;
+
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Schema mapped onto a set of Arrow files.
+ */
+class ArrowSchema extends AbstractSchema {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ArrowSchema.class);
+ private final Supplier<Map<String, Table>> tableMapSupplier;
+
+ /**
+ * Creates an Arrow schema.
+ *
+ * @param baseDirectory Base directory to look for relative files
+ */
+ ArrowSchema(File baseDirectory) {
+ requireNonNull(baseDirectory, "baseDirectory");
+ this.tableMapSupplier =
+ Suppliers.memoize(() -> deduceTableMap(baseDirectory));
+ }
+
+ /**
+ * Looks for a suffix on a string and returns
+ * either the string with the suffix removed
+ * or the original string.
+ */
+ private static String trim(String s, String suffix) {
+ String trimmed = trimOrNull(s, suffix);
+ return trimmed != null ? trimmed : s;
+ }
+
+ /**
+ * Looks for a suffix on a string and returns
+ * either the string with the suffix removed
+ * or null.
+ */
+ private static @Nullable String trimOrNull(String s, String suffix) {
+ return s.endsWith(suffix)
+ ? s.substring(0, s.length() - suffix.length())
+ : null;
+ }
+
+ @Override protected Map<String, Table> getTableMap() {
+ return tableMapSupplier.get();
+ }
+
+ private static Map<String, Table> deduceTableMap(File baseDirectory) {
+ File[] files = baseDirectory.listFiles((dir, name) -> name.endsWith(".arrow"));
+ if (files == null) {
+ LOGGER.info("directory " + baseDirectory + " not found");
+ return ImmutableMap.of();
+ }
+
+ final Map<String, Table> tables = new HashMap<>();
+ for (File file : files) {
+ final File arrowFile = new File(Sources.of(file).path());
+ final FileInputStream fileInputStream;
+ try {
+ fileInputStream = new FileInputStream(arrowFile);
+ } catch (FileNotFoundException e) {
+ throw Util.toUnchecked(e);
+ }
+ final SeekableReadChannel seekableReadChannel =
+ new SeekableReadChannel(fileInputStream.getChannel());
+ final RootAllocator allocator = new RootAllocator();
+ final ArrowFileReader arrowFileReader =
+ new ArrowFileReader(seekableReadChannel, allocator);
+ final String tableName =
+ trim(file.getName(), ".arrow").toUpperCase(Locale.ROOT);
+ final ArrowTable table =
+ new ArrowTable(null, arrowFileReader);
+ tables.put(tableName, table);
+ }
+
+ return ImmutableMap.copyOf(tables);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java
new file mode 100644
index 0000000..07810c9
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchemaFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Factory that creates an {@link ArrowSchema}.
+ */
+public class ArrowSchemaFactory implements SchemaFactory {
+
+ @Override public Schema create(SchemaPlus parentSchema, String name,
+ Map<String, Object> operand) {
+ final File baseDirectory =
+ (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
+ final String directory = (String) operand.get("directory");
+ File directoryFile = null;
+ if (directory != null) {
+ directoryFile = new File(directory);
+ }
+ if (baseDirectory != null) {
+ if (directoryFile == null) {
+ directoryFile = baseDirectory;
+ } else if (!directoryFile.isAbsolute()) {
+ directoryFile = new File(baseDirectory, directoryFile.getPath());
+ }
+ }
+ if (directoryFile == null) {
+ throw new RuntimeException("no directory");
+ }
+ return new ArrowSchema(directoryFile);
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
new file mode 100644
index 0000000..f868756
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import org.apache.arrow.gandiva.evaluator.Filter;
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.gandiva.expression.Condition;
+import org.apache.arrow.gandiva.expression.ExpressionTree;
+import org.apache.arrow.gandiva.expression.TreeBuilder;
+import org.apache.arrow.gandiva.expression.TreeNode;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Arrow Table.
+ */
+public class ArrowTable extends AbstractTable
+ implements TranslatableTable, QueryableTable {
+ private final @Nullable RelProtoDataType protoRowType;
+ /** Arrow schema. (In Calcite terminology, more like a row type than a Schema.) */
+ private final Schema schema;
+ private final ArrowFileReader arrowFileReader;
+
+ ArrowTable(@Nullable RelProtoDataType protoRowType, ArrowFileReader arrowFileReader) {
+ try {
+ this.schema = arrowFileReader.getVectorSchemaRoot().getSchema();
+ } catch (IOException e) {
+ throw Util.toUnchecked(e);
+ }
+ this.protoRowType = protoRowType;
+ this.arrowFileReader = arrowFileReader;
+ }
+
+ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ if (this.protoRowType != null) {
+ return this.protoRowType.apply(typeFactory);
+ }
+ return deduceRowType(this.schema, (JavaTypeFactory) typeFactory);
+ }
+
+ @Override public Expression getExpression(SchemaPlus schema, String tableName,
+ Class clazz) {
+ return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
+ }
+
+ /** Called via code generation; see uses of
+ * {@link org.apache.calcite.adapter.arrow.ArrowMethod#ARROW_QUERY}. */
+ @SuppressWarnings("unused")
+ public Enumerable<Object> query(DataContext root, ImmutableIntList fields,
+ List<String> conditions) {
+ requireNonNull(fields, "fields");
+ final Projector projector;
+ final Filter filter;
+
+ if (conditions.isEmpty()) {
+ filter = null;
+
+ final List<ExpressionTree> expressionTrees = new ArrayList<>();
+ for (int fieldOrdinal : fields) {
+ Field field = schema.getFields().get(fieldOrdinal);
+ TreeNode node = TreeBuilder.makeField(field);
+ expressionTrees.add(TreeBuilder.makeExpression(node, field));
+ }
+ try {
+ projector = Projector.make(schema, expressionTrees);
+ } catch (GandivaException e) {
+ throw Util.toUnchecked(e);
+ }
+ } else {
+ projector = null;
+
+ final List<TreeNode> conditionNodes = new ArrayList<>(conditions.size());
+ for (String condition : conditions) {
+ String[] data = condition.split(" ");
+ List<TreeNode> treeNodes = new ArrayList<>(2);
+ treeNodes.add(
+ TreeBuilder.makeField(schema.getFields()
+ .get(schema.getFields().indexOf(schema.findField(data[0])))));
+ treeNodes.add(makeLiteralNode(data[2], data[3]));
+ String equality = data[1];
+ conditionNodes.add(
+ TreeBuilder.makeFunction(equality, treeNodes, new ArrowType.Bool()));
+ }
+ final Condition filterCondition;
+ if (conditionNodes.size() == 1) {
+ filterCondition = TreeBuilder.makeCondition(conditionNodes.get(0));
+ } else {
+ TreeNode treeNode = TreeBuilder.makeAnd(conditionNodes);
+ filterCondition = TreeBuilder.makeCondition(treeNode);
+ }
+
+ try {
+ filter = Filter.make(schema, filterCondition);
+ } catch (GandivaException e) {
+ throw Util.toUnchecked(e);
+ }
+ }
+
+ return new ArrowEnumerable(arrowFileReader, fields, projector, filter);
+ }
+
+ @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+ SchemaPlus schema, String tableName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public Type getElementType() {
+ return Object[].class;
+ }
+
+ @Override public RelNode toRel(RelOptTable.ToRelContext context,
+ RelOptTable relOptTable) {
+ final int fieldCount = relOptTable.getRowType().getFieldCount();
+ final ImmutableIntList fields =
+ ImmutableIntList.copyOf(Util.range(fieldCount));
+ final RelOptCluster cluster = context.getCluster();
+ return new ArrowTableScan(cluster, cluster.traitSetOf(ArrowRel.CONVENTION),
+ relOptTable, this, fields);
+ }
+
+ private static RelDataType deduceRowType(Schema schema,
+ JavaTypeFactory typeFactory) {
+ final RelDataTypeFactory.Builder builder = typeFactory.builder();
+ for (Field field : schema.getFields()) {
+ builder.add(field.getName(),
+ ArrowFieldType.of(field.getType()).toType(typeFactory));
+ }
+ return builder.build();
+ }
+
+ private static TreeNode makeLiteralNode(String literal, String type) {
+ switch (type) {
+ case "integer":
+ return TreeBuilder.makeLiteral(Integer.parseInt(literal));
+ case "long":
+ return TreeBuilder.makeLiteral(Long.parseLong(literal));
+ case "float":
+ return TreeBuilder.makeLiteral(Float.parseFloat(literal));
+ case "double":
+ return TreeBuilder.makeLiteral(Double.parseDouble(literal));
+ case "string":
+ return TreeBuilder.makeStringLiteral(literal.substring(1, literal.length() - 1));
+ default:
+ throw new IllegalArgumentException("Invalid literal " + literal
+ + ", type " + type);
+ }
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java
new file mode 100644
index 0000000..651a26a
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTableScan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableIntList;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Relational expression representing a scan of an Arrow collection.
+ */
+class ArrowTableScan extends TableScan implements ArrowRel {
+ private final ArrowTable arrowTable;
+ private final ImmutableIntList fields;
+
+ ArrowTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+ RelOptTable relOptTable, ArrowTable arrowTable, ImmutableIntList fields) {
+ super(cluster, traitSet, ImmutableList.of(), relOptTable);
+ this.arrowTable = arrowTable;
+ this.fields = fields;
+
+ assert getConvention() == ArrowRel.CONVENTION;
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ checkArgument(inputs.isEmpty());
+ return this;
+ }
+
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("fields", fields);
+ }
+
+ @Override public RelDataType deriveRowType() {
+ final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
+ final RelDataTypeFactory.Builder builder =
+ getCluster().getTypeFactory().builder();
+ for (int field : fields) {
+ builder.add(fieldList.get(field));
+ }
+ return builder.build();
+ }
+
+ @Override public void register(RelOptPlanner planner) {
+ planner.addRule(ArrowRules.TO_ENUMERABLE);
+ for (RelOptRule rule : ArrowRules.RULES) {
+ planner.addRule(rule);
+ }
+ }
+
+ @Override public void implement(ArrowRel.Implementor implementor) {
+ implementor.arrowTable = arrowTable;
+ implementor.table = table;
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java
new file mode 100644
index 0000000..3b90dfd
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowToEnumerableConverter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BuiltInMethod;
+
+import com.google.common.primitives.Ints;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Relational expression representing a scan of a table in an Arrow data source.
+ */
+class ArrowToEnumerableConverter
+ extends ConverterImpl implements EnumerableRel {
+
+ protected ArrowToEnumerableConverter(RelOptCluster cluster,
+ RelTraitSet traitSet, RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traitSet, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new ArrowToEnumerableConverter(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ RelOptCost cost = super.computeSelfCost(planner, mq);
+ return requireNonNull(cost, "cost").multiplyBy(0.1);
+ }
+
+ @Override public Result implement(EnumerableRelImplementor implementor,
+ Prefer pref) {
+ final ArrowRel.Implementor arrowImplementor = new ArrowRel.Implementor();
+ arrowImplementor.visitInput(0, getInput());
+ PhysType physType =
+ PhysTypeImpl.of(
+ implementor.getTypeFactory(),
+ getRowType(),
+ pref.preferArray());
+
+ final RelOptTable table = requireNonNull(arrowImplementor.table, "table");
+ final int fieldCount = table.getRowType().getFieldCount();
+ return implementor.result(physType,
+ Blocks.toBlock(
+ Expressions.call(table.getExpression(ArrowTable.class),
+ ArrowMethod.ARROW_QUERY.method, implementor.getRootExpression(),
+ arrowImplementor.selectFields != null
+ ? Expressions.call(
+ BuiltInMethod.IMMUTABLE_INT_LIST_COPY_OF.method,
+ Expressions.constant(
+ Ints.toArray(arrowImplementor.selectFields)))
+ : Expressions.call(
+ BuiltInMethod.IMMUTABLE_INT_LIST_IDENTITY.method,
+ Expressions.constant(fieldCount)),
+ Expressions.constant(arrowImplementor.whereClause))));
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java
new file mode 100644
index 0000000..5b68a13
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTranslator.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.DateString;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.calcite.util.DateTimeStringUtils.ISO_DATETIME_FRACTIONAL_SECOND_FORMAT;
+import static org.apache.calcite.util.DateTimeStringUtils.getDateFormatter;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Translates a {@link RexNode} expression to a Gandiva string.
+ */
+class ArrowTranslator {
+ final RexBuilder rexBuilder;
+ final RelDataType rowType;
+ final List<String> fieldNames;
+
+ /** Private constructor. */
+ ArrowTranslator(RexBuilder rexBuilder, RelDataType rowType) {
+ this.rexBuilder = rexBuilder;
+ this.rowType = rowType;
+ this.fieldNames = ArrowRules.arrowFieldNames(rowType);
+ }
+
+ /** Creates an ArrowTranslator. */
+ public static ArrowTranslator create(RexBuilder rexBuilder,
+ RelDataType rowType) {
+ return new ArrowTranslator(rexBuilder, rowType);
+ }
+
+ List<String> translateMatch(RexNode condition) {
+ List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
+ if (disjunctions.size() == 1) {
+ return translateAnd(disjunctions.get(0));
+ } else {
+ throw new AssertionError("cannot translate " + condition);
+ }
+ }
+
+ /**
+ * Returns the value of the literal.
+ *
+ * @param literal Literal to translate
+ *
+ * @return The value of the literal in the form of the actual type
+ */
+ private static Object literalValue(RexLiteral literal) {
+ switch (literal.getTypeName()) {
+ case TIMESTAMP:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final SimpleDateFormat dateFormatter =
+ getDateFormatter(ISO_DATETIME_FRACTIONAL_SECOND_FORMAT);
+ Long millis = literal.getValueAs(Long.class);
+ return dateFormatter.format(requireNonNull(millis, "millis"));
+ case DATE:
+ final DateString dateString = literal.getValueAs(DateString.class);
+ return requireNonNull(dateString, "dateString").toString();
+ default:
+ return requireNonNull(literal.getValue3());
+ }
+ }
+
+ /**
+ * Translate a conjunctive predicate to a SQL string.
+ *
+ * @param condition A conjunctive predicate
+ *
+ * @return SQL string for the predicate
+ */
+ private List<String> translateAnd(RexNode condition) {
+ List<String> predicates = new ArrayList<>();
+ for (RexNode node : RelOptUtil.conjunctions(condition)) {
+ if (node.getKind() == SqlKind.SEARCH) {
+ final RexNode node2 = RexUtil.expandSearch(rexBuilder, null, node);
+ predicates.addAll(translateMatch(node2));
+ } else {
+ predicates.add(translateMatch2(node));
+ }
+ }
+ return predicates;
+ }
+
+ /** Translate a binary relation. */
+ private String translateMatch2(RexNode node) {
+ switch (node.getKind()) {
+ case EQUALS:
+ return translateBinary("equal", "=", (RexCall) node);
+ case LESS_THAN:
+ return translateBinary("less_than", ">", (RexCall) node);
+ case LESS_THAN_OR_EQUAL:
+ return translateBinary("less_than_or_equal_to", ">=", (RexCall) node);
+ case GREATER_THAN:
+ return translateBinary("greater_than", "<", (RexCall) node);
+ case GREATER_THAN_OR_EQUAL:
+ return translateBinary("greater_than_or_equal_to", "<=", (RexCall) node);
+ default:
+ throw new AssertionError("cannot translate " + node);
+ }
+ }
+
+ /**
+ * Translates a call to a binary operator, reversing arguments if
+ * necessary.
+ */
+ private String translateBinary(String op, String rop, RexCall call) {
+ final RexNode left = call.operands.get(0);
+ final RexNode right = call.operands.get(1);
+ @Nullable String expression = translateBinary2(op, left, right);
+ if (expression != null) {
+ return expression;
+ }
+ expression = translateBinary2(rop, right, left);
+ if (expression != null) {
+ return expression;
+ }
+ throw new AssertionError("cannot translate op " + op + " call " + call);
+ }
+
+ /** Translates a call to a binary operator. Returns null on failure. */
+ private @Nullable String translateBinary2(String op, RexNode left, RexNode right) {
+ if (right.getKind() != SqlKind.LITERAL) {
+ return null;
+ }
+ final RexLiteral rightLiteral = (RexLiteral) right;
+ switch (left.getKind()) {
+ case INPUT_REF:
+ final RexInputRef left1 = (RexInputRef) left;
+ String name = fieldNames.get(left1.getIndex());
+ return translateOp2(op, name, rightLiteral);
+ case CAST:
+ // FIXME This will not work in all cases (for example, we ignore string encoding)
+ return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+ default:
+ return null;
+ }
+ }
+
+ /** Combines a field name, operator, and literal to produce a predicate string. */
+ private String translateOp2(String op, String name, RexLiteral right) {
+ Object value = literalValue(right);
+ String valueString = value.toString();
+ String valueType = getLiteralType(value);
+
+ if (value instanceof String) {
+ final RelDataTypeField field = requireNonNull(rowType.getField(name, true, false), "field");
+ SqlTypeName typeName = field.getType().getSqlTypeName();
+ if (typeName != SqlTypeName.CHAR) {
+ valueString = "'" + valueString + "'";
+ }
+ }
+ return name + " " + op + " " + valueString + " " + valueType;
+ }
+
+ private static String getLiteralType(Object literal) {
+ if (literal instanceof BigDecimal) {
+ BigDecimal bigDecimalLiteral = (BigDecimal) literal;
+ int scale = bigDecimalLiteral.scale();
+ if (scale == 0) {
+ return "integer";
+ } else if (scale > 0) {
+ return "float";
+ }
+ } else if (String.class.equals(literal.getClass())) {
+ return "string";
+ }
+ throw new AssertionError("Invalid literal");
+ }
+}
diff --git a/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java
new file mode 100644
index 0000000..51ced6b
--- /dev/null
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Query provider that reads from Arrow files.
+ */
+package org.apache.calcite.adapter.arrow;
diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
new file mode 100644
index 0000000..19e7bdf
--- /dev/null
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.util.Sources;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for the Apache Arrow adapter.
+ */
+@Execution(ExecutionMode.SAME_THREAD)
+@ExtendWith(ArrowExtension.class)
+class ArrowAdapterTest {
+ private static Map<String, String> arrow;
+ private static File arrowDataDirectory;
+
+ @BeforeAll
+ static void initializeArrowState(@TempDir Path sharedTempDir) throws IOException, SQLException {
+ URL modelUrl =
+ Objects.requireNonNull(ArrowAdapterTest.class.getResource("/arrow-model.json"), "url");
+ Path sourceModelFilePath = Sources.of(modelUrl).file().toPath();
+ Path modelFileTarget = sharedTempDir.resolve("arrow-model.json");
+ Files.copy(sourceModelFilePath, modelFileTarget);
+
+ Path arrowFilesDirectory = sharedTempDir.resolve("arrow");
+ Files.createDirectory(arrowFilesDirectory);
+ arrowDataDirectory = arrowFilesDirectory.toFile();
+
+ File dataLocationFile = arrowFilesDirectory.resolve("arrowdata.arrow").toFile();
+ ArrowData arrowDataGenerator = new ArrowData();
+ arrowDataGenerator.writeArrowData(dataLocationFile);
+ arrowDataGenerator.writeScottEmpData(arrowFilesDirectory);
+
+ arrow = ImmutableMap.of("model", modelFileTarget.toAbsolutePath().toString());
+ }
+
+ /** Test to read an Arrow file and check its field names. */
+ @Test void testArrowSchema() {
+ ArrowSchema arrowSchema = new ArrowSchema(arrowDataDirectory);
+ Map<String, Table> tableMap = arrowSchema.getTableMap();
+ RelDataTypeFactory typeFactory =
+ new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ RelDataType relDataType = tableMap.get("ARROWDATA").getRowType(typeFactory);
+
+ assertEquals(relDataType.getFieldNames(),
+ ImmutableList.of("intField", "stringField", "floatField", "longField"));
+ }
+
+ @Test void testArrowProjectAllFields() {
+ String sql = "select * from arrowdata\n";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0; stringField=0; floatField=0.0; longField=0\n"
+ + "intField=1; stringField=1; floatField=1.0; longField=1\n"
+ + "intField=2; stringField=2; floatField=2.0; longField=2\n"
+ + "intField=3; stringField=3; floatField=3.0; longField=3\n"
+ + "intField=4; stringField=4; floatField=4.0; longField=4\n"
+ + "intField=5; stringField=5; floatField=5.0; longField=5\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(6)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectAllFieldsExplicitly() {
+ String sql = "select \"intField\", \"stringField\", \"floatField\", \"longField\" "
+ + "from arrowdata\n";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0; stringField=0; floatField=0.0; longField=0\n"
+ + "intField=1; stringField=1; floatField=1.0; longField=1\n"
+ + "intField=2; stringField=2; floatField=2.0; longField=2\n"
+ + "intField=3; stringField=3; floatField=3.0; longField=3\n"
+ + "intField=4; stringField=4; floatField=4.0; longField=4\n"
+ + "intField=5; stringField=5; floatField=5.0; longField=5\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(6)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectAllFieldsExplicitlyPermutation() {
+ String sql = "select \"stringField\", \"intField\", \"longField\", \"floatField\" "
+ + "from arrowdata\n";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(stringField=[$1], intField=[$0], longField=[$3], floatField=[$2])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "stringField=0; intField=0; longField=0; floatField=0.0\n"
+ + "stringField=1; intField=1; longField=1; floatField=1.0\n"
+ + "stringField=2; intField=2; longField=2; floatField=2.0\n"
+ + "stringField=3; intField=3; longField=3; floatField=3.0\n"
+ + "stringField=4; intField=4; longField=4; floatField=4.0\n"
+ + "stringField=5; intField=5; longField=5; floatField=5.0\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(6)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectSingleField() {
+ String sql = "select \"intField\" from arrowdata\n";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0\nintField=1\nintField=2\n"
+ + "intField=3\nintField=4\nintField=5\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(6)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectTwoFields() {
+ String sql = "select \"intField\", \"stringField\" from arrowdata\n";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0; stringField=0\n"
+ + "intField=1; stringField=1\n"
+ + "intField=2; stringField=2\n"
+ + "intField=3; stringField=3\n"
+ + "intField=4; stringField=4\n"
+ + "intField=5; stringField=5\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(6)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectFieldsWithIntegerFilter() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" < 4";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[<($0, 4)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0; stringField=0\n"
+ + "intField=1; stringField=1\n"
+ + "intField=2; stringField=2\n"
+ + "intField=3; stringField=3\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectFieldsWithMultipleFilterSameField() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" > 1 and \"intField\" < 4";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[SEARCH($0, Sarg[(1..4)])])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=2; stringField=2\n"
+ + "intField=3; stringField=3\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectFieldsWithConjunctiveFilters() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\"=12 and \"stringField\"='12'";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[AND(=($0, 12), =($1, '12'))])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=12; stringField=12\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Disabled("OR is not supported yet")
+ @Test void testArrowProjectFieldsWithDisjunctiveFilter() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\"=12 or \"stringField\"='12'";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[OR(=($0, 12), =($1, '12'))])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=12; stringField=12\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Disabled("IN is not supported as OR is not supported yet")
+ @Test void testArrowProjectFieldsWithInFilter() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" in (0, 1, 2)";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[OR(=($0, 0), =($0, 1), =($0, 2))])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0; stringField=0\n"
+ + "intField=1; stringField=1\n"
+ + "intField=2; stringField=2\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Disabled("IS NOT NULL is not supported yet")
+ @Test void testArrowProjectFieldsWithIsNotNullFilter() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" is not null\n"
+ + "order by \"intField\"\n"
+ + "limit 1";
+ String plan = "PLAN=EnumerableLimit(fetch=[1])\n"
+ + " EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[IS NOT NULL($0)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0; stringField=0\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Disabled("IS NULL is not supported yet")
+ @Test void testArrowProjectFieldsWithIsNullFilter() {
+ String sql = "select \"intField\", \"stringField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" is null";
+ String plan = "ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[IS NOT NULL($0)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returnsCount(0)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectFieldsWithFloatFilter() {
+ String sql = "select * from arrowdata\n"
+ + " where \"floatField\"=15.0";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowFilter(condition=[=(CAST($2):DOUBLE, 15.0)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=15; stringField=15; floatField=15.0; longField=15\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowProjectFieldsWithFilterOnLaterBatch() {
+ String sql = "select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\"=25";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0])\n"
+ + " ArrowFilter(condition=[=($0, 25)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=25\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowSubquery() {
+ String sql = "select \"intField\"\n"
+ + "from (select \"intField\", \"stringField\" from arrowdata where \"stringField\" = '2')\n"
+ + "where \"intField\" = 2";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0])\n"
+ + " ArrowFilter(condition=[AND(=($1, '2'), =($0, 2))])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n"
+ + "\n";
+ String result = "intField=2\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Disabled("UNION does not work")
+ @Test void testArrowUnion() {
+ String sql = "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 2)\n"
+ + " union \n"
+ + "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 1)\n";
+ String plan = "PLAN=EnumerableUnion(all=[false])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0])\n"
+ + " ArrowFilter(condition=[=($0, 2)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0])\n"
+ + " ArrowFilter(condition=[=($0, 1)])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=1\nintField=2\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testFieldWithSpace() {
+ String sql = "select \"my Field\" from (select \"intField\", \"stringField\" as \"my Field\"\n"
+ + "from arrowdata)\n"
+ + "where \"my Field\" = '2'";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(my Field=[$1])\n"
+ + " ArrowFilter(condition=[=($1, '2')])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "my Field=2\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Disabled("literal with space is not supported")
+ @Test void testLiteralWithSpace() {
+ String sql = "select \"intField\", \"stringField\" as \"my Field\"\n"
+ + "from arrowdata\n"
+ + "where \"stringField\" = 'literal with space'";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], my Field=[$1])\n"
+ + " ArrowFilter(condition=[=($1, '2')])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testLiteralWithQuote() {
+ String sql = "select \"intField\", \"stringField\" as \"my Field\"\n"
+ + "from arrowdata\n"
+ + "where \"stringField\" = ''''";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], stringField=[$1])\n"
+ + " ArrowFilter(condition=[=($1, '''')])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testTinyIntProject() {
+ String sql = "select DEPTNO from DEPT";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(DEPTNO=[$0])\n"
+ + " ArrowTableScan(table=[[ARROW, DEPT]], fields=[[0, 1, 2]])\n\n";
+ String result = "DEPTNO=10\nDEPTNO=20\nDEPTNO=30\nDEPTNO=40\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testSmallIntProject() {
+ String sql = "select EMPNO from EMP";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(EMPNO=[$0])\n"
+ + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+ String result = "EMPNO=7369\nEMPNO=7499\nEMPNO=7521\n";
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(3)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testCastDecimalToInt() {
+ String sql = "select CAST(LOSAL AS INT) as \"trunc\" from SALGRADE";
+ String plan =
+ "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):INTEGER], trunc=[$t3])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n";
+ String result = "trunc=700\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .typeIs("[trunc INTEGER]")
+ .limit(1)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testCastDecimalToFloat() {
+ String sql = "select CAST(LOSAL AS FLOAT) as \"extra\" from SALGRADE";
+ String plan = "PLAN=EnumerableCalc(expr#0..2=[{inputs}],"
+ + " expr#3=[CAST($t1):FLOAT], extra=[$t3])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n";
+ String result = "extra=700.0\nextra=1201.0\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .typeIs("[extra FLOAT]")
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testCastDecimalToDouble() {
+ String sql = "select CAST(LOSAL AS DOUBLE) as \"extra\" from SALGRADE";
+ String plan =
+ "PLAN=EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):DOUBLE], extra=[$t3])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, SALGRADE]], fields=[[0, 1, 2]])\n\n";
+ String result = "extra=700.0\nextra=1201.0\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .typeIs("[extra DOUBLE]")
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testCastIntToDouble() {
+ String sql = "select CAST(\"intField\" AS DOUBLE) as \"dbl\" from arrowdata";
+ String plan =
+ "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t0):DOUBLE], dbl=[$t4])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "dbl=0.0\ndbl=1.0\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .typeIs("[dbl DOUBLE]")
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testStringOperation() {
+ String sql = "select\n"
+ + " \"stringField\" || '_suffix' as \"field1\"\n"
+ + "from arrowdata";
+ String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], expr#4=['_suffix'], "
+ + "expr#5=[||($t1, $t4)], field1=[$t5])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "field1=0_suffix\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(1)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+
+ @Disabled("join is not supported yet")
+ @Test void testJoin() {
+ String sql = "select t1.\"intField\", t2.\"intField\" "
+ + "from arrowdata t1 join arrowdata t2 on t1.\"intField\" = t2.\"intField\"";
+ String plan = "PLAN=EnumerableJoin(condition=[=($0, $4)], joinType=[inner])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0\nintField=1\nintField=2\nintField=3\nintField=4\nintField=5\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(1)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testAggWithoutAggFunctions() {
+ String sql = "select DISTINCT(\"intField\") as \"dep\" from arrowdata";
+ String plan = "PLAN=EnumerableAggregate(group=[{0}])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "dep=0\ndep=1\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testAggWithAggFunctions() {
+ String sql = "select JOB, SUM(SAL) as TOTAL from EMP GROUP BY JOB";
+ String plan = "PLAN=EnumerableAggregate(group=[{2}], TOTAL=[SUM($5)])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+ String result = "JOB=SALESMAN; TOTAL=5600.00\nJOB=ANALYST; TOTAL=6000.00\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testFilteredAgg() {
+ String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP";
+ String plan = "PLAN=EnumerableAggregate(group=[{}], SALESSUM=[SUM($0) FILTER $1])\n"
+ + " EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], "
+ + "expr#10=[IS TRUE($t9)], SAL=[$t5], $f1=[$t10])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+ String result = "SALESSUM=2500.00\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testFilteredAggGroupBy() {
+ String sql = "select SUM(SAL) FILTER (WHERE COMM > 400) as SALESSUM from EMP group by EMPNO";
+ String plan = "PLAN=EnumerableCalc(expr#0..1=[{inputs}], SALESSUM=[$t1])\n"
+ + " EnumerableAggregate(group=[{0}], SALESSUM=[SUM($1) FILTER $2])\n"
+ + " EnumerableCalc(expr#0..7=[{inputs}], expr#8=[400], expr#9=[>($t6, $t8)], "
+ + "expr#10=[IS TRUE($t9)], EMPNO=[$t0], SAL=[$t5], $f2=[$t10])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+ String result = "SALESSUM=1250.00\nSALESSUM=null\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testAggGroupedByNullable() {
+ String sql = "select COMM, SUM(SAL) as SALESSUM from EMP GROUP BY COMM";
+ String plan = "PLAN=EnumerableAggregate(group=[{6}], SALESSUM=[SUM($5)])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, EMP]], fields=[[0, 1, 2, 3, 4, 5, 6, 7]])\n\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returnsUnordered("COMM=0.00; SALESSUM=1500.00",
+ "COMM=1400.00; SALESSUM=1250.00",
+ "COMM=300.00; SALESSUM=1600.00",
+ "COMM=500.00; SALESSUM=1250.00",
+ "COMM=null; SALESSUM=23425.00")
+ .explainContains(plan);
+ }
+
+ @Test void testArrowAdapterLimitNoSort() {
+ String sql = "select \"intField\"\n"
+ + "from arrowdata\n"
+ + "limit 2";
+ String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n"
+ + " EnumerableLimit(fetch=[2])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=0\nintField=1\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowLimitOffsetNoSort() {
+ String sql = "select \"intField\"\n"
+ + "from arrowdata\n"
+ + "limit 2 offset 2";
+ String plan = "PLAN=EnumerableCalc(expr#0..3=[{inputs}], intField=[$t0])\n"
+ + " EnumerableLimit(offset=[2], fetch=[2])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=2\nintField=3\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testArrowSortOnLong() {
+ String sql = "select \"intField\" from arrowdata order by \"longField\" desc";
+ String plan = "PLAN=EnumerableSort(sort0=[$1], dir0=[DESC])\n"
+ + " ArrowToEnumerableConverter\n"
+ + " ArrowProject(intField=[$0], longField=[$3])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATA]], fields=[[0, 1, 2, 3]])\n\n";
+ String result = "intField=49\nintField=48\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+}
diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java
new file mode 100644
index 0000000..3870bb2
--- /dev/null
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowData.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
+import org.apache.arrow.adapter.jdbc.JdbcToArrow;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FloatingPointVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+
+import com.google.common.collect.ImmutableList;
+
+import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Calendar;
+import java.util.List;
+
+/**
+ * Class that can be used to generate Arrow sample data into a data directory.
+ */
+public class ArrowData {
+
+ private final int batchSize;
+ private final int entries;
+ private int intValue;
+ private int stringValue;
+ private float floatValue;
+ private long longValue;
+
+ public ArrowData() {
+ this.batchSize = 20;
+ this.entries = 50;
+ this.intValue = 0;
+ this.stringValue = 0;
+ this.floatValue = 0;
+ this.longValue = 0;
+ }
+
+ private Schema makeArrowSchema() {
+ ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ FieldType intType = FieldType.nullable(new ArrowType.Int(32, true));
+ FieldType stringType = FieldType.nullable(new ArrowType.Utf8());
+ FieldType floatType =
+ FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE));
+ FieldType longType = FieldType.nullable(new ArrowType.Int(64, true));
+
+ childrenBuilder.add(new Field("intField", intType, null));
+ childrenBuilder.add(new Field("stringField", stringType, null));
+ childrenBuilder.add(new Field("floatField", floatType, null));
+ childrenBuilder.add(new Field("longField", longType, null));
+
+ return new Schema(childrenBuilder.build(), null);
+ }
+
+ public void writeScottEmpData(Path arrowDataDirectory) throws IOException, SQLException {
+ List<String> tableNames = ImmutableList.of("EMP", "DEPT", "SALGRADE");
+
+ Connection connection =
+ DriverManager.getConnection(ScottHsqldb.URI, ScottHsqldb.USER, ScottHsqldb.PASSWORD);
+
+ for (String tableName : tableNames) {
+ String sql = "SELECT * FROM " + tableName;
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+
+ Calendar calendar = JdbcToArrowUtils.getUtcCalendar();
+
+ RootAllocator rootAllocator = new RootAllocator();
+ JdbcToArrowConfig config = new JdbcToArrowConfigBuilder()
+ .setAllocator(rootAllocator)
+ .setReuseVectorSchemaRoot(true)
+ .setCalendar(calendar)
+ .setTargetBatchSize(1024)
+ .build();
+
+ ArrowVectorIterator vectorIterator = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config);
+ Path tablePath = arrowDataDirectory.resolve(tableName + ".arrow");
+
+ FileOutputStream fileOutputStream = new FileOutputStream(tablePath.toFile());
+
+ VectorSchemaRoot vectorSchemaRoot = vectorIterator.next();
+
+ ArrowFileWriter arrowFileWriter =
+ new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel());
+
+ arrowFileWriter.start();
+ arrowFileWriter.writeBatch();
+
+ while (vectorIterator.hasNext()) {
+ // refreshes the data in the VectorSchemaRoot with the next batch
+ vectorIterator.next();
+ arrowFileWriter.writeBatch();
+ }
+
+ arrowFileWriter.close();
+ }
+ }
+
+ public void writeArrowData(File file) throws IOException {
+ FileOutputStream fileOutputStream = new FileOutputStream(file);
+ Schema arrowSchema = makeArrowSchema();
+ VectorSchemaRoot vectorSchemaRoot =
+ VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE));
+ ArrowFileWriter arrowFileWriter =
+ new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel());
+
+ arrowFileWriter.start();
+
+ for (int i = 0; i < this.entries;) {
+ int numRows = Math.min(this.batchSize, this.entries - i);
+ vectorSchemaRoot.setRowCount(numRows);
+ for (Field field : vectorSchemaRoot.getSchema().getFields()) {
+ FieldVector vector = vectorSchemaRoot.getVector(field.getName());
+ switch (vector.getMinorType()) {
+ case INT:
+ intField(vector, numRows);
+ break;
+ case FLOAT4:
+ floatField(vector, numRows);
+ break;
+ case VARCHAR:
+ varCharField(vector, numRows);
+ break;
+ case BIGINT:
+ longField(vector, numRows);
+ break;
+ default:
+ throw new IllegalStateException("Not supported type yet: " + vector.getMinorType());
+ }
+ }
+ arrowFileWriter.writeBatch();
+ i += numRows;
+ }
+ arrowFileWriter.end();
+ arrowFileWriter.close();
+ fileOutputStream.flush();
+ fileOutputStream.close();
+ }
+
+ private void intField(FieldVector fieldVector, int rowCount) {
+ IntVector intVector = (IntVector) fieldVector;
+ intVector.setInitialCapacity(rowCount);
+ intVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ intVector.set(i, 1, intValue);
+ this.intValue++;
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+
+ private void floatField(FieldVector fieldVector, int rowCount) {
+ FloatingPointVector floatingPointVector = (FloatingPointVector) fieldVector;
+ floatingPointVector.setInitialCapacity(rowCount);
+ floatingPointVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ float value = this.floatValue;
+ floatingPointVector.setWithPossibleTruncate(i, value);
+ this.floatValue++;
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+
+ private void varCharField(FieldVector fieldVector, int rowCount) {
+ VarCharVector varCharVector = (VarCharVector) fieldVector;
+ varCharVector.setInitialCapacity(rowCount);
+ varCharVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ String value = String.valueOf(this.stringValue);
+ varCharVector.set(i, new Text(value));
+ this.stringValue++;
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+
+ private void longField(FieldVector fieldVector, int rowCount) {
+ BigIntVector longVector = (BigIntVector) fieldVector;
+ longVector.setInitialCapacity(rowCount);
+ longVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ longVector.set(i, this.longValue);
+ this.longValue++;
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+}
diff --git a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java
new file mode 100644
index 0000000..e5edb91
--- /dev/null
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowExtension.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.arrow;
+
+import org.apache.calcite.config.CalciteSystemProperty;
+
+import org.apache.arrow.gandiva.evaluator.Projector;
+import org.apache.arrow.gandiva.exceptions.GandivaException;
+import org.apache.arrow.gandiva.expression.ExpressionTree;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JUnit5 extension to handle Arrow tests.
+ *
+ * <p>Tests will be skipped if the Gandiva library cannot be loaded on the given platform.
+ */
+class ArrowExtension implements ExecutionCondition {
+
+ /**
+ * Whether to run this test.
+ *
+ * <p>Enabled by default, unless explicitly disabled from command line
+ * ({@code -Dcalcite.test.arrow=false}) or if Gandiva library, used to implement arrow
+ * filtering/projection, cannot be loaded.
+ *
+ * @return {@code true} if the test is enabled and can run in the current environment,
+ * {@code false} otherwise
+ */
+ @Override public ConditionEvaluationResult evaluateExecutionCondition(
+ final ExtensionContext context) {
+
+ boolean enabled = CalciteSystemProperty.TEST_ARROW.value();
+ try {
+ Schema emptySchema = new Schema(new ArrayList<>(), null);
+ List<ExpressionTree> expressions = new ArrayList<>();
+ Projector.make(emptySchema, expressions);
+ } catch (GandivaException e) {
+ // this exception comes from using an empty expression,
+ // but the JNI library was loaded properly
+ } catch (UnsatisfiedLinkError e) {
+ enabled = false;
+ }
+
+ if (enabled) {
+ return ConditionEvaluationResult.enabled("Arrow tests enabled");
+ } else {
+ return ConditionEvaluationResult.disabled("Cassandra tests disabled");
+ }
+ }
+}
diff --git a/arrow/src/test/resources/arrow-model.json b/arrow/src/test/resources/arrow-model.json
new file mode 100644
index 0000000..fed2210
--- /dev/null
+++ b/arrow/src/test/resources/arrow-model.json
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "version": "1.0",
+ "defaultSchema": "ARROW",
+ "schemas": [
+ {
+ "name": "ARROW",
+ "type": "custom",
+ "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory",
+ "operand": {
+ "directory": "arrow"
+ }
+ }
+ ]
+}
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 79fb7cc..3cc3323 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -97,6 +97,10 @@
apiv("net.java.dev.jna:jna")
apiv("net.java.dev.jna:jna-platform")
apiv("net.sf.opencsv:opencsv")
+ apiv("org.apache.arrow:arrow-memory-netty", "arrow")
+ apiv("org.apache.arrow:arrow-vector", "arrow")
+ apiv("org.apache.arrow:arrow-jdbc", "arrow")
+ apiv("org.apache.arrow.gandiva:arrow-gandiva", "arrow-gandiva")
apiv("org.apache.calcite.avatica:avatica-core", "calcite.avatica")
apiv("org.apache.calcite.avatica:avatica-server", "calcite.avatica")
apiv("org.apache.cassandra:cassandra-all")
diff --git a/build.gradle.kts b/build.gradle.kts
index 3ad31fe..579d4bd 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -212,7 +212,7 @@
}
val adaptersForSqlline = listOf(
- ":babel", ":cassandra", ":druid", ":elasticsearch",
+ ":arrow", ":babel", ":cassandra", ":druid", ":elasticsearch",
":file", ":geode", ":innodb", ":kafka", ":mongodb",
":pig", ":piglet", ":plus", ":redis", ":spark", ":splunk")
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
index 4db1b30..4d454c6 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
@@ -211,6 +211,12 @@
});
/**
+ * Whether to run Arrow tests.
+ */
+ public static final CalciteSystemProperty<Boolean> TEST_ARROW =
+ booleanProperty("calcite.test.arrow", true);
+
+ /**
* Whether to run MongoDB tests.
*/
public static final CalciteSystemProperty<Boolean> TEST_MONGODB =
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index b98cc91..6280e11 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -342,6 +342,8 @@
SORTED_MULTI_MAP_ARRAYS(SortedMultiMap.class, "arrays", Comparator.class),
SORTED_MULTI_MAP_SINGLETON(SortedMultiMap.class, "singletonArrayIterator",
Comparator.class, List.class),
+ IMMUTABLE_INT_LIST_IDENTITY(ImmutableIntList.class, "identity", int.class),
+ IMMUTABLE_INT_LIST_COPY_OF(ImmutableIntList.class, "copyOf", int[].class),
BINARY_SEARCH5_LOWER(BinarySearch.class, "lowerBound", Object[].class,
Object.class, int.class, int.class, Comparator.class),
BINARY_SEARCH5_UPPER(BinarySearch.class, "upperBound", Object[].class,
diff --git a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java
index cb34fe3..c8448aa 100644
--- a/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java
+++ b/core/src/main/java/org/apache/calcite/util/ImmutableIntList.java
@@ -70,13 +70,25 @@
* Creates an ImmutableIntList from an array of {@code int}.
*/
public static ImmutableIntList of(int... ints) {
+ if (ints.length == 0) {
+ return EMPTY;
+ }
return new ImmutableIntList(ints.clone());
}
+ /** Same as {@link #of(int...)}, but less ambiguous for code generators
+ * and compilers. */
+ public static ImmutableIntList copyOf(int... ints) {
+ return of(ints);
+ }
+
/**
* Creates an ImmutableIntList from an array of {@code Number}.
*/
public static ImmutableIntList copyOf(Number... numbers) {
+ if (numbers.length == 0) {
+ return EMPTY;
+ }
final int[] ints = new int[numbers.length];
for (int i = 0; i < ints.length; i++) {
ints[i] = numbers[i].intValue();
@@ -108,6 +120,9 @@
private static ImmutableIntList copyFromCollection(
Collection<? extends Number> list) {
+ if (list.isEmpty()) {
+ return EMPTY;
+ }
final int[] ints = new int[list.size()];
int i = 0;
for (Number number : list) {
@@ -192,7 +207,7 @@
return ints.clone();
}
- /** Returns an List of {@code Integer}. */
+ /** Returns a List of {@code Integer}. */
public List<Integer> toIntegerList() {
ArrayList<Integer> arrayList = new ArrayList<>(size());
for (int i : ints) {
@@ -283,6 +298,9 @@
* @see Mappings#isIdentity(List, int)
*/
public static ImmutableIntList identity(int count) {
+ if (count == 0) {
+ return EMPTY;
+ }
final int[] integers = new int[count];
for (int i = 0; i < integers.length; i++) {
integers[i] = i;
diff --git a/gradle.properties b/gradle.properties
index 51e13a2..1bf19c1 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -83,6 +83,8 @@
# elasticsearch does not like asm:6.2.1+
aggdesigner-algorithm.version=6.0
apiguardian-api.version=1.1.2
+arrow-gandiva.version=15.0.0
+arrow.version=15.0.0
asm.version=7.2
byte-buddy.version=1.9.3
cassandra-all.version=4.1.3
diff --git a/settings.gradle.kts b/settings.gradle.kts
index a5ea536..adb373f 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -67,6 +67,7 @@
include(
"bom",
"release",
+ "arrow",
"babel",
"cassandra",
"core",
diff --git a/site/_docs/adapter.md b/site/_docs/adapter.md
index 1ceff65..844b4a9 100644
--- a/site/_docs/adapter.md
+++ b/site/_docs/adapter.md
@@ -27,6 +27,7 @@
A schema adapter allows Calcite to read particular kind of data,
presenting the data as tables within a schema.
+* [Arrow adapter](arrow_adapter.html) (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/arrow/package-summary.html">calcite-arrow</a>)
* [Cassandra adapter](cassandra_adapter.html) (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/cassandra/package-summary.html">calcite-cassandra</a>)
* CSV adapter (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/csv/package-summary.html">example/csv</a>)
* [Druid adapter](druid_adapter.html) (<a href="{{ site.apiRoot }}/org/apache/calcite/adapter/druid/package-summary.html">calcite-druid</a>)
diff --git a/site/_docs/arrow_adapter.md b/site/_docs/arrow_adapter.md
new file mode 100644
index 0000000..2dc0a1c
--- /dev/null
+++ b/site/_docs/arrow_adapter.md
@@ -0,0 +1,83 @@
+---
+layout: docs
+title: Arrow adapter
+permalink: /docs/arrow_adapter.html
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+**Note**: Arrow Adapter is an experimental feature;
+changes in public API and usage are expected.
+
+## Overview
+
+Calcite's adapter for Apache Arrow is able to read and process data in Arrow
+format using SQL.
+
+It can read files in Arrow's
+[Feather format](https://arrow.apache.org/docs/python/feather.html)
+(which generally have a `.arrow` suffix) in the same way that the
+[File Adapter](file_adapter.html) can read `.csv` files.
+
+## A simple example
+
+Let's start with a simple example. First, we need a
+[model definition]({{ site.baseurl }}/docs/model.html),
+as follows.
+
+{% highlight json %}
+{
+ "version": "1.0",
+ "defaultSchema": "ARROW",
+ "schemas": [
+ {
+ "name": "ARROW",
+ "type": "custom",
+ "factory": "org.apache.calcite.adapter.arrow.ArrowSchemaFactory",
+ "operand": {
+ "directory": "arrow"
+ }
+ }
+ ]
+}
+{% endhighlight %}
+
+The model file is stored as `arrow/src/test/resources/arrow-model.json`,
+so you can connect via [`sqlline`](https://github.com/julianhyde/sqlline)
+as follows:
+
+{% highlight bash %}
+$ ./sqlline
+sqlline> !connect jdbc:calcite:model=arrow/src/test/resources/arrow-model.json admin admin
+sqlline> select * from arrow.test;
++----------+----------+------------+
+| fieldOne | fieldTwo | fieldThree |
++----------+----------+------------+
+| 1 | abc | 1.2 |
+| 2 | def | 3.4 |
+| 3 | xyz | 5.6 |
+| 4 | abcd | 1.22 |
+| 5 | defg | 3.45 |
+| 6 | xyza | 5.67 |
++----------+----------+------------+
+6 rows selected
+{% endhighlight %}
+
+The `arrow` directory contains a file called `test.arrow`, and so it shows up as
+a table called `test`.