[Pulsar SQL] Make Pulsar SQL get correct offload configurations (#7701)

### Motivation

Currently, Pulsar SQL can't get the correct offload configurations.

### Modifications

Make Pulsar SQL get the complete offload configurations.

### Verifying this change

Add a new integration test.
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index f4a7d74..091efe1 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -40,7 +40,6 @@
 import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
 import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -81,9 +80,8 @@
 
         this.statsProvider.start(clientConfiguration);
 
-        OffloadPolicies offloadPolicies = new OffloadPolicies();
-        BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig);
-        this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
+        this.defaultOffloader = initManagedLedgerOffloader(
+                pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
     }
 
     public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 49d2ae3..0a172d9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import javax.validation.constraints.NotNull;
 import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -31,6 +32,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 
 /**
@@ -399,6 +401,16 @@
         return this.pulsarAdmin;
     }
 
+    public OffloadPolicies getOffloadPolices() {
+        Properties offloadProperties = new Properties();
+        offloadProperties.putAll(getOffloaderProperties());
+        OffloadPolicies offloadPolicies = OffloadPolicies.create(offloadProperties);
+        offloadPolicies.setManagedLedgerOffloadDriver(getManagedLedgerOffloadDriver());
+        offloadPolicies.setManagedLedgerOffloadMaxThreads(getManagedLedgerOffloadMaxThreads());
+        offloadPolicies.setOffloadersDirectory(getOffloadersDirectory());
+        return offloadPolicies;
+    }
+
     @Override
     public void close() throws Exception {
         this.pulsarAdmin.close();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
index faf2bbc..f3d2f7a 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -68,4 +69,35 @@
         Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads());
     }
 
+    @Test
+    public void testGetOffloadPolices() throws Exception {
+        PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+
+        final String managedLedgerOffloadDriver = "s3";
+        final String offloaderDirectory = "/pulsar/offloaders";
+        final int managedLedgerOffloadMaxThreads = 5;
+        final String bucket = "offload-bucket";
+        final String region = "us-west-2";
+        final String endpoint = "http://s3.amazonaws.com";
+        final String offloadProperties = "{"
+                + "\"s3ManagedLedgerOffloadBucket\":\"" + bucket + "\","
+                + "\"s3ManagedLedgerOffloadRegion\":\"" + region + "\","
+                + "\"s3ManagedLedgerOffloadServiceEndpoint\":\"" + endpoint + "\""
+                + "}";
+
+        connectorConfig.setManagedLedgerOffloadDriver(managedLedgerOffloadDriver);
+        connectorConfig.setOffloadersDirectory(offloaderDirectory);
+        connectorConfig.setManagedLedgerOffloadMaxThreads(managedLedgerOffloadMaxThreads);
+        connectorConfig.setOffloaderProperties(offloadProperties);
+
+        OffloadPolicies offloadPolicies = connectorConfig.getOffloadPolices();
+        Assert.assertNotNull(offloadPolicies);
+        Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), managedLedgerOffloadDriver);
+        Assert.assertEquals(offloadPolicies.getOffloadersDirectory(), offloaderDirectory);
+        Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadMaxThreads(), managedLedgerOffloadMaxThreads);
+        Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadBucket(), bucket);
+        Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRegion(), region);
+        Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadServiceEndpoint(), endpoint);
+    }
+
 }