[fix][broker] Duplicate LedgerOffloader creation when namespace/topic… (#21591)
(cherry picked from commit 98bf9dd72910e1b02dea17148a4199e3b26d7147)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index a6f99a0..ab2daa3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -85,6 +86,7 @@
public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public static final Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
+ public static final String EXTRA_CONFIG_PREFIX = "managedLedgerOffloadExtraConfig";
public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
@@ -116,8 +118,7 @@
private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
- private Map<String, String> managedLedgerExtraConfigurations = null;
-
+ private Map<String, String> managedLedgerExtraConfigurations = new HashMap<>();
// s3 config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -240,8 +241,7 @@
public static OffloadPoliciesImpl create(Properties properties) {
OffloadPoliciesImpl data = new OffloadPoliciesImpl();
- Field[] fields = OffloadPoliciesImpl.class.getDeclaredFields();
- Arrays.stream(fields).forEach(f -> {
+ for (Field f : CONFIGURATION_FIELDS) {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
@@ -252,14 +252,15 @@
f.getName(), properties.get(f.getName())), e);
}
}
- });
- Map<String, String> extraConfigurations = properties.entrySet().stream()
- .filter(entry -> entry.getKey().toString().startsWith("managedLedgerOffloadExtraConfig"))
- .collect(Collectors.toMap(
- entry -> entry.getKey().toString().replaceFirst("managedLedgerOffloadExtraConfig", ""),
- entry -> entry.getValue().toString()));
+ }
- data.setManagedLedgerExtraConfigurations(extraConfigurations);
+ Map<String, String> extraConfigurations = properties.entrySet().stream()
+ .filter(entry -> entry.getKey().toString().startsWith(EXTRA_CONFIG_PREFIX))
+ .collect(Collectors.toMap(
+ entry -> entry.getKey().toString().replaceFirst(EXTRA_CONFIG_PREFIX, ""),
+ entry -> entry.getValue().toString()));
+
+ data.getManagedLedgerExtraConfigurations().putAll(extraConfigurations);
data.compatibleWithBrokerConfigFile(properties);
return data;
@@ -337,64 +338,21 @@
public Properties toProperties() {
Properties properties = new Properties();
- setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority());
- setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory());
- setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver());
- setProperty(properties, "managedLedgerOffloadMaxThreads",
- this.getManagedLedgerOffloadMaxThreads());
- setProperty(properties, "managedLedgerOffloadPrefetchRounds",
- this.getManagedLedgerOffloadPrefetchRounds());
- setProperty(properties, "managedLedgerOffloadThresholdInBytes",
- this.getManagedLedgerOffloadThresholdInBytes());
- setProperty(properties, "managedLedgerOffloadDeletionLagInMillis",
- this.getManagedLedgerOffloadDeletionLagInMillis());
- setProperty(properties, "managedLedgerOffloadExtraConfigurations",
- this.getManagedLedgerExtraConfigurations());
-
- if (this.isS3Driver()) {
- setProperty(properties, "s3ManagedLedgerOffloadRegion",
- this.getS3ManagedLedgerOffloadRegion());
- setProperty(properties, "s3ManagedLedgerOffloadBucket",
- this.getS3ManagedLedgerOffloadBucket());
- setProperty(properties, "s3ManagedLedgerOffloadServiceEndpoint",
- this.getS3ManagedLedgerOffloadServiceEndpoint());
- setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
- this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
- setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
- this.getS3ManagedLedgerOffloadCredentialId());
- setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
- this.getS3ManagedLedgerOffloadCredentialSecret());
- setProperty(properties, "s3ManagedLedgerOffloadRole",
- this.getS3ManagedLedgerOffloadRole());
- setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
- this.getS3ManagedLedgerOffloadRoleSessionName());
- setProperty(properties, "s3ManagedLedgerOffloadReadBufferSizeInBytes",
- this.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
- } else if (this.isGcsDriver()) {
- setProperty(properties, "gcsManagedLedgerOffloadRegion",
- this.getGcsManagedLedgerOffloadRegion());
- setProperty(properties, "gcsManagedLedgerOffloadBucket",
- this.getGcsManagedLedgerOffloadBucket());
- setProperty(properties, "gcsManagedLedgerOffloadMaxBlockSizeInBytes",
- this.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
- setProperty(properties, "gcsManagedLedgerOffloadReadBufferSizeInBytes",
- this.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
- setProperty(properties, "gcsManagedLedgerOffloadServiceAccountKeyFile",
- this.getGcsManagedLedgerOffloadServiceAccountKeyFile());
- } else if (this.isFileSystemDriver()) {
- setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath());
- setProperty(properties, "fileSystemURI", this.getFileSystemURI());
+ for (Field f : CONFIGURATION_FIELDS) {
+ try {
+ f.setAccessible(true);
+ if ("managedLedgerExtraConfigurations".equals(f.getName())) {
+ Map<String, String> extraConfig = (Map<String, String>) f.get(this);
+ extraConfig.forEach((key, value) -> {
+ setProperty(properties, EXTRA_CONFIG_PREFIX + key, value);
+ });
+ } else {
+ setProperty(properties, f.getName(), f.get(this));
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("An error occurred while processing the field: " + f.getName(), e);
+ }
}
-
- setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket());
- setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion());
- setProperty(properties, "managedLedgerOffloadServiceEndpoint",
- this.getManagedLedgerOffloadServiceEndpoint());
- setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
- this.getManagedLedgerOffloadMaxBlockSizeInBytes());
- setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
- this.getManagedLedgerOffloadReadBufferSizeInBytes());
-
return properties;
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 646167e..ab216da 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data;
+import static org.apache.pulsar.common.policies.data.OffloadPoliciesImpl.EXTRA_CONFIG_PREFIX;
+import static org.testng.Assert.assertEquals;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
@@ -26,6 +28,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
@@ -375,8 +378,8 @@
@Test
public void testCreateOffloadPoliciesWithExtraConfiguration() {
Properties properties = new Properties();
- properties.put("managedLedgerOffloadExtraConfigKey1", "value1");
- properties.put("managedLedgerOffloadExtraConfigKey2", "value2");
+ properties.put(EXTRA_CONFIG_PREFIX + "Key1", "value1");
+ properties.put(EXTRA_CONFIG_PREFIX + "Key2", "value2");
OffloadPoliciesImpl policies = OffloadPoliciesImpl.create(properties);
Map<String, String> extraConfigurations = policies.getManagedLedgerExtraConfigurations();
@@ -384,4 +387,28 @@
Assert.assertEquals(extraConfigurations.get("Key1"), "value1");
Assert.assertEquals(extraConfigurations.get("Key2"), "value2");
}
+
+ /**
+ * Test toProperties as well as create from properties.
+ * @throws Exception
+ */
+ @Test
+ public void testToProperties() throws Exception {
+ // Base information convert.
+ OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
+ "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
+ 10 * 1024 * 1024L, 10000L, OffloadedReadPriority.TIERED_STORAGE_FIRST);
+ assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+
+ // Set useless config to offload policies. Make sure convert conversion result is the same.
+ offloadPolicies.setFileSystemProfilePath("/test/file");
+ assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+
+ // Set extra config to offload policies. Make sure convert conversion result is the same.
+ Map<String, String> extraConfiguration = new HashMap<>();
+ extraConfiguration.put("key1", "value1");
+ extraConfiguration.put("key2", "value2");
+ offloadPolicies.setManagedLedgerExtraConfigurations(extraConfiguration);
+ assertEquals(offloadPolicies, OffloadPoliciesImpl.create(offloadPolicies.toProperties()));
+ }
}