| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.io; |
| |
| import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; |
| import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.common.functions.FunctionConfig; |
| import org.apache.pulsar.common.functions.Utils; |
| import org.apache.pulsar.common.io.SourceConfig; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.apache.pulsar.functions.utils.FunctionCommon; |
| import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils; |
| import org.testng.annotations.Test; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Test Pulsar Source |
| */ |
| @Test(groups = "broker-io") |
| public class PulsarSourceE2ETest extends AbstractPulsarE2ETest { |
| |
| private void testPulsarSourceStats(String jarFilePathUrl) throws Exception { |
| final String namespacePortion = "io"; |
| final String replNamespace = tenant + "/" + namespacePortion; |
| final String sinkTopic = "persistent://" + replNamespace + "/output"; |
| final String sourceName = "PulsarSource-test"; |
| admin.namespaces().createNamespace(replNamespace); |
| Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); |
| admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); |
| |
| SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic); |
| if (jarFilePathUrl.startsWith(Utils.BUILTIN)) { |
| sourceConfig.setArchive(jarFilePathUrl); |
| admin.sources().createSource(sourceConfig, null); |
| } else { |
| admin.sources().createSourceWithUrl(sourceConfig, jarFilePathUrl); |
| } |
| |
| retryStrategically((test) -> { |
| try { |
| return (admin.topics().getStats(sinkTopic).publishers.size() == 1); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 10, 150); |
| |
| final String sinkTopic2 = "persistent://" + replNamespace + "/output2"; |
| sourceConfig.setTopicName(sinkTopic2); |
| |
| if (jarFilePathUrl.startsWith(Utils.BUILTIN)) { |
| admin.sources().updateSource(sourceConfig, null); |
| } else { |
| admin.sources().updateSourceWithUrl(sourceConfig, jarFilePathUrl); |
| } |
| |
| retryStrategically((test) -> { |
| try { |
| TopicStats sourceStats = admin.topics().getStats(sinkTopic2); |
| return sourceStats.publishers.size() == 1 |
| && sourceStats.publishers.get(0).metadata != null |
| && sourceStats.publishers.get(0).metadata.containsKey("id") |
| && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName)); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 50, 150); |
| |
| TopicStats sourceStats = admin.topics().getStats(sinkTopic2); |
| assertEquals(sourceStats.publishers.size(), 1); |
| assertNotNull(sourceStats.publishers.get(0).metadata); |
| assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id")); |
| assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName)); |
| |
| retryStrategically((test) -> { |
| try { |
| return (admin.topics().getStats(sinkTopic2).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic2, false).numberOfEntries > 4); |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 50, 150); |
| assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1); |
| |
| String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); |
| log.info("prometheusMetrics: {}", prometheusMetrics); |
| |
| Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); |
| PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertTrue(m.value > 0.0); |
| m = metrics.get("pulsar_source_received_total_1min"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertTrue(m.value > 0.0); |
| m = metrics.get("pulsar_source_written_total"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertTrue(m.value > 0.0); |
| m = metrics.get("pulsar_source_written_total_1min"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertTrue(m.value > 0.0); |
| m = metrics.get("pulsar_source_source_exceptions_total"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertEquals(m.value, 0.0); |
| m = metrics.get("pulsar_source_source_exceptions_total_1min"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertEquals(m.value, 0.0); |
| m = metrics.get("pulsar_source_system_exceptions_total"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertEquals(m.value, 0.0); |
| m = metrics.get("pulsar_source_system_exceptions_total_1min"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertEquals(m.value, 0.0); |
| m = metrics.get("pulsar_source_last_invocation"); |
| assertEquals(m.tags.get("cluster"), config.getClusterName()); |
| assertEquals(m.tags.get("instance_id"), "0"); |
| assertEquals(m.tags.get("name"), sourceName); |
| assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); |
| assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); |
| assertTrue(m.value > 0.0); |
| |
| tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted(); |
| admin.sources().deleteSource(tenant, namespacePortion, sourceName); |
| } |
| |
| @Test(timeOut = 20000, groups = "builtin") |
| public void testPulsarSourceStatsBuiltin() throws Exception { |
| String jarFilePathUrl = String.format("%s://data-generator", Utils.BUILTIN); |
| testPulsarSourceStats(jarFilePathUrl); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testPulsarSourceStatsWithFile() throws Exception { |
| String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString(); |
| testPulsarSourceStats(jarFilePathUrl); |
| } |
| |
| @Test(timeOut = 40000) |
| public void testPulsarSourceStatsWithUrl() throws Exception { |
| testPulsarSourceStats(fileServer.getUrl("/pulsar-io-data-generator.nar")); |
| } |
| |
| private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) { |
| SourceConfig sourceConfig = new SourceConfig(); |
| sourceConfig.setTenant(tenant); |
| sourceConfig.setNamespace(namespace); |
| sourceConfig.setName(functionName); |
| sourceConfig.setParallelism(1); |
| sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); |
| sourceConfig.setTopicName(sinkTopic); |
| return sourceConfig; |
| } |
| |
| } |