[Pulsar SQL] Make Pulsar SQL get correct offload configurations (#7701)

### Motivation

Currently, Pulsar SQL can't get the correct offload configurations.

### Modifications

Make Pulsar SQL get the complete offload configurations.

### Verifying this change

Add a new integration test.
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 093f3eb..e29d945 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -50,6 +50,7 @@
 
     @BeforeClass
     public void setupPresto() throws Exception {
+        log.info("[setupPresto]");
         pulsarCluster.startPrestoWorker();
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
new file mode 100644
index 0000000..8ba48aa
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Slf4j
+public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
+
+    private final static int ENTRIES_PER_LEDGER = 1024;
+    private final static String OFFLOAD_DRIVER = "s3";
+    private final static String BUCKET = "pulsar-integtest";
+    private final static String ENDPOINT = "http://" + S3Container.NAME + ":9090";
+
+    private S3Container s3Container;
+
+    @Override
+    protected void beforeStartCluster() throws Exception {
+        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+            getEnv().forEach(brokerContainer::withEnv);
+        }
+    }
+
+    @BeforeClass
+    public void setupPresto() throws Exception {
+        s3Container = new S3Container(
+                pulsarCluster.getClusterName(),
+                S3Container.NAME)
+                .withNetwork(pulsarCluster.getNetwork())
+                .withNetworkAliases(S3Container.NAME);
+        s3Container.start();
+
+        log.info("[setupPresto] prestoWorker: " + pulsarCluster.getPrestoWorkerContainer());
+        pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, getOffloadProperties(BUCKET, null, ENDPOINT));
+    }
+
+    public String getOffloadProperties(String bucket, String region, String endpoint) {
+        checkNotNull(bucket);
+        StringBuilder sb = new StringBuilder();
+        sb.append("{");
+        sb.append("\"s3ManagedLedgerOffloadBucket\":").append("\"").append(bucket).append("\",");
+        if (StringUtils.isNotEmpty(region)) {
+            sb.append("\"s3ManagedLedgerOffloadRegion\":").append("\"").append(region).append("\",");
+        }
+        if (StringUtils.isNotEmpty(endpoint)) {
+            sb.append("\"s3ManagedLedgerOffloadServiceEndpoint\":").append("\"").append(endpoint).append("\"");
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+
+    @AfterClass
+    public void teardownPresto() {
+        log.info("tearing down...");
+        if (null != s3Container) {
+            s3Container.stop();
+        }
+
+        pulsarCluster.stopPrestoWorker();
+    }
+
+    // Flaky Test: https://github.com/apache/pulsar/issues/7750
+    // @Test
+    public void testQueryTieredStorage1() throws Exception {
+        testSimpleSQLQuery(false);
+    }
+
+    // Flaky Test: https://github.com/apache/pulsar/issues/7750
+    // @Test
+    public void testQueryTieredStorage2() throws Exception {
+        testSimpleSQLQuery(true);
+    }
+
+    public void testSimpleSQLQuery(boolean isNamespaceOffload) throws Exception {
+
+        // wait until presto worker started
+        ContainerExecResult result;
+        do {
+            try {
+                result = execQuery("show catalogs;");
+                assertThat(result.getExitCode()).isEqualTo(0);
+                assertThat(result.getStdout()).contains("pulsar", "system");
+                break;
+            } catch (ContainerExecException cee) {
+                if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
+                    Thread.sleep(10000);
+                } else {
+                    throw cee;
+                }
+            }
+        } while (true);
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                                    .build();
+
+        String stocksTopic = "stocks-" + randomName(5);
+
+        @Cleanup
+        Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
+                .topic(stocksTopic)
+                .create();
+
+        long firstLedgerId = -1;
+        long currentLedgerId = -1;
+        int sendMessageCnt = 0;
+        while (currentLedgerId <= firstLedgerId) {
+            sendMessageCnt ++;
+            final Stock stock = new Stock(sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10);
+            MessageIdImpl messageId = (MessageIdImpl) producer.send(stock);
+            if (firstLedgerId == -1) {
+                firstLedgerId = messageId.getLedgerId();
+            }
+            currentLedgerId = messageId.getLedgerId();
+            log.info("firstLedgerId: {}, currentLedgerId: {}", firstLedgerId, currentLedgerId);
+            Thread.sleep(100);
+        }
+        producer.flush();
+
+        offloadAndDeleteFromBK(isNamespaceOffload, stocksTopic);
+
+        // check schema
+        result = execQuery("show schemas in pulsar;");
+        assertThat(result.getExitCode()).isEqualTo(0);
+        assertThat(result.getStdout()).contains("public/default");
+
+        // check table
+        result = execQuery("show tables in pulsar.\"public/default\";");
+        assertThat(result.getExitCode()).isEqualTo(0);
+        assertThat(result.getStdout()).contains(stocksTopic);
+
+        // check query
+        ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
+        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+        log.info("select sql query output \n{}", containerExecResult.getStdout());
+        String[] split = containerExecResult.getStdout().split("\n");
+        assertThat(split.length).isGreaterThan(sendMessageCnt - 2);
+
+        String[] split2 = containerExecResult.getStdout().split("\n|,");
+
+        for (int i = 0; i < sendMessageCnt - 2; ++i) {
+            assertThat(split2).contains("\"" + i + "\"");
+            assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
+            assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
+        }
+
+        // test predicate pushdown
+
+        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
+        Connection connection = DriverManager.getConnection(url, "test", null);
+
+        String query = String.format("select * from pulsar" +
+                ".\"public/default\".%s order by __publish_time__", stocksTopic);
+        log.info("Executing query: {}", query);
+        ResultSet res = connection.createStatement().executeQuery(query);
+
+        List<Timestamp> timestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            timestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(timestamps.size()).isGreaterThan(sendMessageCnt - 2);
+
+        query = String.format("select * from pulsar" +
+                ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        List<Timestamp> returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
+
+        // Try with a predicate that has a earlier time than any entry
+        // Should return all rows
+        query = String.format("select * from pulsar" +
+                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
+
+        // Try with a predicate that has a latter time than any entry
+        // Should return no rows
+
+        query = String.format("select * from pulsar" +
+                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        assertThat(returnedTimestamps.size()).isEqualTo(0);
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    public static ContainerExecResult execQuery(final String query) throws Exception {
+        ContainerExecResult containerExecResult;
+
+        containerExecResult = pulsarCluster.getPrestoWorkerContainer()
+                .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+
+        return containerExecResult;
+
+    }
+
+    private static void printCurrent(ResultSet rs) throws SQLException {
+        ResultSetMetaData rsmd = rs.getMetaData();
+        int columnsNumber = rsmd.getColumnCount();
+        for (int i = 1; i <= columnsNumber; i++) {
+            if (i > 1) System.out.print(",  ");
+            String columnValue = rs.getString(i);
+            System.out.print(columnValue + " " + rsmd.getColumnName(i));
+        }
+        System.out.println("");
+
+    }
+
+    private void offloadAndDeleteFromBK(boolean isNamespaceOffload, String stocksTopic) {
+        String adminUrl = pulsarCluster.getHttpServiceUrl();
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            // read managed ledger info, check ledgers exist
+            long firstLedger = admin.topics().getInternalStats(stocksTopic).ledgers.get(0).ledgerId;
+
+            String output = "";
+
+            if (isNamespaceOffload) {
+                pulsarCluster.runAdminCommandOnAnyBroker(
+                        "namespaces", "set-offload-policies",
+                        "--bucket", "pulsar-integtest",
+                        "--driver", "s3",
+                        "--endpoint", "http://" + S3Container.NAME + ":9090",
+                        "--offloadAfterElapsed", "1000",
+                        "public/default");
+
+                output = pulsarCluster.runAdminCommandOnAnyBroker(
+                        "namespaces", "get-offload-policies").getStdout();
+                Assert.assertTrue(output.contains("pulsar-integtest"));
+                Assert.assertTrue(output.contains("s3"));
+            }
+
+            // offload with a low threshold
+            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "offload", "--size-threshold", "1M", stocksTopic).getStdout();
+            Assert.assertTrue(output.contains("Offload triggered"));
+
+            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "offload-status", "-w", stocksTopic).getStdout();
+            Assert.assertTrue(output.contains("Offload was a success"));
+
+            // delete the first ledger, so that we cannot possibly read from it
+            ClientConfiguration bkConf = new ClientConfiguration();
+            bkConf.setZkServers(pulsarCluster.getZKConnString());
+            try (BookKeeper bk = new BookKeeper(bkConf)) {
+                bk.deleteLedger(firstLedger);
+            } catch (Exception e) {
+                log.error("Failed to delete from BookKeeper.", e);
+                Assert.fail("Failed to delete from BookKeeper.");
+            }
+
+            // Unload topic to clear all caches, open handles, etc
+            admin.topics().unload(stocksTopic);
+        } catch (Exception e) {
+            Assert.fail("Failed to deleteOffloadedDataFromBK.");
+        }
+    }
+
+    protected Map<String, String> getEnv() {
+        Map<String, String> result = new HashMap<>();
+        result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        result.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER);
+        result.put("s3ManagedLedgerOffloadBucket", BUCKET);
+        result.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT);
+
+        return result;
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 6ffd38a..36175a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -328,6 +328,11 @@
     }
 
     public void startPrestoWorker() {
+        startPrestoWorker(null, null);
+    }
+
+    public void startPrestoWorker(String offloadDriver, String offloadProperties) {
+        log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties);
         if (null == prestoWorkerContainer) {
             prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
                     .withNetwork(network)
@@ -337,8 +342,18 @@
                     .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
                     .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
                     .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+            if (offloadDriver != null && offloadProperties != null) {
+                log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}",
+                        offloadDriver, offloadProperties);
+                prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver);
+                prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloader-properties", offloadProperties);
+                prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders");
+                // used in s3 tests
+                prestoWorkerContainer.withEnv("AWS_ACCESS_KEY_ID", "accesskey");
+                prestoWorkerContainer.withEnv("AWS_SECRET_KEY", "secretkey");
+            }
         }
-        log.info("Starting Presto Worker");
+        log.info("[startPrestoWorker] Starting Presto Worker");
         prestoWorkerContainer.start();
     }