[Pulsar SQL] Make Pulsar SQL get correct offload configurations (#7701)
### Motivation
Currently, Pulsar SQL can't get the correct offload configurations.
### Modifications
Make Pulsar SQL get the complete offload configurations.
### Verifying this change
Add a new integration test.
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index f4a7d74..091efe1 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -40,7 +40,6 @@
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -81,9 +80,8 @@
this.statsProvider.start(clientConfiguration);
- OffloadPolicies offloadPolicies = new OffloadPolicies();
- BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig);
- this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
+ this.defaultOffloader = initManagedLedgerOffloader(
+ pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
}
public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 49d2ae3..0a172d9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Matcher;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -31,6 +32,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.protocol.Commands;
/**
@@ -399,6 +401,16 @@
return this.pulsarAdmin;
}
+ public OffloadPolicies getOffloadPolices() {
+ Properties offloadProperties = new Properties();
+ offloadProperties.putAll(getOffloaderProperties());
+ OffloadPolicies offloadPolicies = OffloadPolicies.create(offloadProperties);
+ offloadPolicies.setManagedLedgerOffloadDriver(getManagedLedgerOffloadDriver());
+ offloadPolicies.setManagedLedgerOffloadMaxThreads(getManagedLedgerOffloadMaxThreads());
+ offloadPolicies.setOffloadersDirectory(getOffloadersDirectory());
+ return offloadPolicies;
+ }
+
@Override
public void close() throws Exception {
this.pulsarAdmin.close();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
index faf2bbc..f3d2f7a 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.sql.presto;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -68,4 +69,35 @@
Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads());
}
+ @Test
+ public void testGetOffloadPolices() throws Exception {
+ PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+
+ final String managedLedgerOffloadDriver = "s3";
+ final String offloaderDirectory = "/pulsar/offloaders";
+ final int managedLedgerOffloadMaxThreads = 5;
+ final String bucket = "offload-bucket";
+ final String region = "us-west-2";
+ final String endpoint = "http://s3.amazonaws.com";
+ final String offloadProperties = "{"
+ + "\"s3ManagedLedgerOffloadBucket\":\"" + bucket + "\","
+ + "\"s3ManagedLedgerOffloadRegion\":\"" + region + "\","
+ + "\"s3ManagedLedgerOffloadServiceEndpoint\":\"" + endpoint + "\""
+ + "}";
+
+ connectorConfig.setManagedLedgerOffloadDriver(managedLedgerOffloadDriver);
+ connectorConfig.setOffloadersDirectory(offloaderDirectory);
+ connectorConfig.setManagedLedgerOffloadMaxThreads(managedLedgerOffloadMaxThreads);
+ connectorConfig.setOffloaderProperties(offloadProperties);
+
+ OffloadPolicies offloadPolicies = connectorConfig.getOffloadPolices();
+ Assert.assertNotNull(offloadPolicies);
+ Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), managedLedgerOffloadDriver);
+ Assert.assertEquals(offloadPolicies.getOffloadersDirectory(), offloaderDirectory);
+ Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadMaxThreads(), managedLedgerOffloadMaxThreads);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadBucket(), bucket);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRegion(), region);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadServiceEndpoint(), endpoint);
+ }
+
}