add-pushdown-session-config (#26)
diff --git a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
index 51d8f67..49c3a18 100644
--- a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
+++ b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoConnector.java
@@ -20,9 +20,13 @@
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.session.PropertyMetadata;
import javax.inject.Inject;
+import java.util.List;
+import java.util.Optional;
+
/** Presto {@link Connector}. */
public class PrestoConnector extends PrestoConnectorBase {
@@ -30,12 +34,20 @@
@Inject
public PrestoConnector(
+ List<PropertyMetadata<?>> sessionProperties,
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
- PrestoMetadata prestoMetadata) {
+ PrestoMetadata prestoMetadata,
+ Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider) {
// Presto 236 compute push-down api is too low, not yet impl.
- super(transactionManager, prestoSplitManager, prestoPageSourceProvider, prestoMetadata);
+ super(
+ sessionProperties,
+ transactionManager,
+ prestoSplitManager,
+ prestoPageSourceProvider,
+ prestoMetadata,
+ Optional.empty());
this.transactionManager = transactionManager;
}
diff --git a/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java b/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
index 0fa0f33..7717abc 100644
--- a/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
+++ b/paimon-presto-0.268/src/main/java/org/apache/paimon/presto/PrestoConnector.java
@@ -20,9 +20,11 @@
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.session.PropertyMetadata;
import javax.inject.Inject;
+import java.util.List;
import java.util.Optional;
/** Presto {@link Connector}. */
@@ -32,17 +34,19 @@
@Inject
public PrestoConnector(
+ List<PropertyMetadata<?>> sessionProperties,
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata,
- PrestoPlanOptimizerProvider prestoPlanOptimizerProvider) {
+ Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider) {
super(
+ sessionProperties,
transactionManager,
prestoSplitManager,
prestoPageSourceProvider,
prestoMetadata,
- Optional.of(prestoPlanOptimizerProvider));
+ prestoPlanOptimizerProvider);
this.transactionManager = transactionManager;
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
index 6cbb738..e19e0a3 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonModule.java
@@ -62,7 +62,6 @@
public void configure(Binder binder) {
binder.bind(PaimonConnectorId.class).toInstance(new PaimonConnectorId(connectorId));
binder.bind(TypeManager.class).toInstance(typeManager);
- binder.bind(PrestoConnector.class).in(Scopes.SINGLETON);
binder.bind(PrestoMetadata.class).in(Scopes.SINGLETON);
binder.bind(PrestoSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PrestoPageSourceProvider.class).in(Scopes.SINGLETON);
@@ -74,5 +73,7 @@
binder.bind(PrestoPlanOptimizerProvider.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(PaimonConfig.class);
+
+ binder.bind(PrestoSessionProperties.class).in(Scopes.SINGLETON);
}
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
index 64c77a5..acd7419 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
@@ -47,23 +47,21 @@
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static org.apache.paimon.presto.PrestoSessionProperties.isPaimonPushdownEnabled;
/** PrestoComputePushdown. */
public class PrestoComputePushdown implements ConnectorPlanOptimizer {
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
- private final PaimonConfig config;
public PrestoComputePushdown(
StandardFunctionResolution functionResolution,
- RowExpressionService rowExpressionService,
- PaimonConfig config) {
+ RowExpressionService rowExpressionService) {
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService =
requireNonNull(rowExpressionService, "rowExpressionService is null");
- this.config = requireNonNull(config, "config is null");
}
@Override
@@ -73,7 +71,7 @@
VariableAllocator variableAllocator,
PlanNodeIdAllocator idAllocator) {
- if (config.isPaimonPushdownEnabled()) {
+ if (isPaimonPushdownEnabled(session)) {
return maxSubplan.accept(new Visitor(session, idAllocator), null);
}
return maxSubplan;
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
index cbaf0fb..da9dc19 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnector.java
@@ -22,9 +22,11 @@
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.connector.EmptyConnectorCommitHandle;
+import com.facebook.presto.spi.session.PropertyMetadata;
import javax.inject.Inject;
+import java.util.List;
import java.util.Optional;
/** Presto {@link Connector}. */
@@ -34,17 +36,19 @@
@Inject
public PrestoConnector(
+ List<PropertyMetadata<?>> sessionProperties,
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata,
- PrestoPlanOptimizerProvider prestoPlanOptimizerProvider) {
+ Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider) {
super(
+ sessionProperties,
transactionManager,
prestoSplitManager,
prestoPageSourceProvider,
prestoMetadata,
- Optional.of(prestoPlanOptimizerProvider));
+ prestoPlanOptimizerProvider);
this.transactionManager = transactionManager;
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
index 0eab77c..42fec7c 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorBase.java
@@ -26,8 +26,11 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
+import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.transaction.IsolationLevel;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import java.util.Optional;
import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
@@ -37,6 +40,7 @@
/** Presto {@link Connector}. */
public abstract class PrestoConnectorBase implements Connector {
+ private final List<PropertyMetadata<?>> sessionProperties;
private final PrestoTransactionManager transactionManager;
private final PrestoSplitManager prestoSplitManager;
private final PrestoPageSourceProvider prestoPageSourceProvider;
@@ -44,24 +48,15 @@
private final Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider;
public PrestoConnectorBase(
- PrestoTransactionManager transactionManager,
- PrestoSplitManager prestoSplitManager,
- PrestoPageSourceProvider prestoPageSourceProvider,
- PrestoMetadata prestoMetadata) {
- this(
- transactionManager,
- prestoSplitManager,
- prestoPageSourceProvider,
- prestoMetadata,
- Optional.empty());
- }
-
- public PrestoConnectorBase(
+ List<PropertyMetadata<?>> sessionProperties,
PrestoTransactionManager transactionManager,
PrestoSplitManager prestoSplitManager,
PrestoPageSourceProvider prestoPageSourceProvider,
PrestoMetadata prestoMetadata,
Optional<PrestoPlanOptimizerProvider> prestoPlanOptimizerProvider) {
+ this.sessionProperties =
+ ImmutableList.copyOf(
+ requireNonNull(sessionProperties, "sessionProperties is null"));
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.prestoSplitManager = requireNonNull(prestoSplitManager, "prestoSplitManager is null");
this.prestoPageSourceProvider =
@@ -108,4 +103,9 @@
public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider() {
return prestoPlanOptimizerProvider.orElseThrow(UnsupportedOperationException::new);
}
+
+ @Override
+ public List<PropertyMetadata<?>> getSessionProperties() {
+ return sessionProperties;
+ }
}
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorFactory.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorFactory.java
index 8dffb5e..842d06c 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorFactory.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoConnectorFactory.java
@@ -28,6 +28,7 @@
import java.lang.reflect.Method;
import java.util.Map;
+import java.util.Optional;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
@@ -74,7 +75,24 @@
}
Injector injector = bootstrap.initialize();
- return injector.getInstance(PrestoConnector.class);
+ PrestoSessionProperties prestoSessionProperties =
+ injector.getInstance(PrestoSessionProperties.class);
+ PrestoTransactionManager prestoTransactionManager =
+ injector.getInstance(PrestoTransactionManager.class);
+ PrestoSplitManager prestoSplitManager = injector.getInstance(PrestoSplitManager.class);
+ PrestoPageSourceProvider prestoPageSourceProvider =
+ injector.getInstance(PrestoPageSourceProvider.class);
+ PrestoMetadata prestoMetadata = injector.getInstance(PrestoMetadata.class);
+ PrestoPlanOptimizerProvider prestoPlanOptimizerProvider =
+ injector.getInstance(PrestoPlanOptimizerProvider.class);
+
+ return new PrestoConnector(
+ prestoSessionProperties.getSessionProperties(),
+ prestoTransactionManager,
+ prestoSplitManager,
+ prestoPageSourceProvider,
+ prestoMetadata,
+ Optional.of(prestoPlanOptimizerProvider));
} catch (Exception e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
index 950d59e..7092753 100644
--- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
@@ -35,23 +35,19 @@
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
- private final PaimonConfig config;
@Inject
public PrestoPlanOptimizerProvider(
StandardFunctionResolution functionResolution,
- RowExpressionService rowExpressionService,
- PaimonConfig config) {
+ RowExpressionService rowExpressionService) {
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService =
requireNonNull(rowExpressionService, "rowExpressionService is null");
- this.config = requireNonNull(config, "config is null");
}
@Override
public Set<ConnectorPlanOptimizer> getLogicalPlanOptimizers() {
- return ImmutableSet.of(
- new PrestoComputePushdown(functionResolution, rowExpressionService, config));
+ return ImmutableSet.of(new PrestoComputePushdown(functionResolution, rowExpressionService));
}
@Override
diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
new file mode 100644
index 0000000..6633c3d
--- /dev/null
+++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.presto;
+
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.session.PropertyMetadata;
+import com.google.common.collect.ImmutableList;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
+
+/** Presto {@link PrestoSessionProperties}. */
+public class PrestoSessionProperties {
+
+ public static final String QUERY_PUSHDOWN_ENABLED = "query_pushdown_enabled";
+
+ private final List<PropertyMetadata<?>> sessionProperties;
+
+ @Inject
+ public PrestoSessionProperties(PaimonConfig config) {
+ sessionProperties =
+ ImmutableList.of(
+ booleanProperty(
+ QUERY_PUSHDOWN_ENABLED,
+ "Enable paimon query pushdown",
+ config.isPaimonPushdownEnabled(),
+ false));
+ }
+
+ public List<PropertyMetadata<?>> getSessionProperties() {
+ return sessionProperties;
+ }
+
+ public static boolean isPaimonPushdownEnabled(ConnectorSession session) {
+ return session.getProperty(QUERY_PUSHDOWN_ENABLED, Boolean.class);
+ }
+}
diff --git a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
index dc03c4a..4ae1ce3 100644
--- a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
+++ b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
@@ -54,6 +54,7 @@
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -159,18 +160,29 @@
return paimonConfig;
}
+ private Map<String, Object> createPrestoSessionConfig(PaimonConfig config) {
+ Map<String, Object> sessionConfig = new HashMap<>();
+ sessionConfig.put(
+ PrestoSessionProperties.QUERY_PUSHDOWN_ENABLED, config.isPaimonPushdownEnabled());
+ return sessionConfig;
+ }
+
@Test
public void testOptimizeFilter() {
// Mock data.
PrestoColumnHandle testData = new PrestoColumnHandle("id", "BIGINT", BIGINT);
PaimonConfig config = createPaimonConfig(true);
+ PrestoSessionProperties prestoSessionProperties = new PrestoSessionProperties(config);
+ Map<String, Object> prestoSessionConfig = createPrestoSessionConfig(config);
PrestoComputePushdown prestoComputePushdown =
- new PrestoComputePushdown(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE, config);
+ new PrestoComputePushdown(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE);
PlanNode mockInputPlan = createFilterNode();
- ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
+ ConnectorSession session =
+ new TestingConnectorSession(
+ prestoSessionProperties.getSessionProperties(), prestoSessionConfig);
PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
@@ -211,12 +223,16 @@
public void testNotOptimizeFilter() {
// Mock data.
PaimonConfig config = createPaimonConfig(false);
+ PrestoSessionProperties prestoSessionProperties = new PrestoSessionProperties(config);
+ Map<String, Object> prestoSessionConfig = createPrestoSessionConfig(config);
PrestoComputePushdown prestoComputePushdown =
- new PrestoComputePushdown(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE, config);
+ new PrestoComputePushdown(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE);
PlanNode mockInputPlan = createFilterNode();
- ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
+ ConnectorSession session =
+ new TestingConnectorSession(
+ prestoSessionProperties.getSessionProperties(), prestoSessionConfig);
PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();