[FLINK-23864][docs] Add flink-connector-pulsar module to flink-docs, auto generate the config document.
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
index 4c33a70..db90bf1 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
@@ -31,7 +31,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
@@ -48,6 +47,7 @@
@ConfigGroup(name = "PulsarClient", keyPrefix = CLIENT_CONFIG_PREFIX),
@ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX)
})
+@SuppressWarnings("java:S1192")
public final class PulsarOptions {
// Pulsar client API config prefix.
@@ -112,8 +112,7 @@
.noDefaultValue()
.withDescription(
Description.builder()
- .text(
- "String represents parameters for the authentication plugin.")
+ .text("Parameters for the authentication plugin.")
.linebreak()
.linebreak()
.text("Example:")
@@ -124,9 +123,8 @@
public static final ConfigOption<Map<String, String>> PULSAR_AUTH_PARAM_MAP =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap")
.mapType()
- .defaultValue(emptyMap())
- .withDescription(
- "Map which represents parameters for the authentication plugin.");
+ .noDefaultValue()
+ .withDescription("Parameters for the authentication plugin.");
public static final ConfigOption<Integer> PULSAR_OPERATION_TIMEOUT_MS =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "operationTimeoutMs")
@@ -134,10 +132,11 @@
.defaultValue(30000)
.withDescription(
Description.builder()
- .text("Operation timeout (in millis).")
+ .text("Operation timeout (in ms).")
.text(
- "Producer-create, subscribe and unsubscribe operations will be retried until this interval,"
- + " after which the operation will be marked as failed.")
+ " Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval.")
+ .text(
+ " If the operation is not completed during this interval, the operation will be marked as failed.")
.build());
public static final ConfigOption<Long> PULSAR_STATS_INTERVAL_SECONDS =
@@ -153,7 +152,7 @@
"Stats is activated with positive %s",
code("statsInterval")),
text(
- "Set %s to 1 second at least",
+ "Set %s to 1 second at least.",
code("statsIntervalSeconds")))
.build());
@@ -173,10 +172,10 @@
.text(
"The number of threads used for handling message listeners.")
.text(
- "The listener thread pool is shared across all the consumers and readers that are using a %s model to get messages.",
+ " The listener thread pool is shared across all the consumers and readers that are using a %s model to get messages.",
code("listener"))
.text(
- "For a given consumer, the listener will be always invoked from the same thread, to ensure ordering.")
+ " For a given consumer, the listener is always invoked from the same thread to ensure ordering.")
.build());
public static final ConfigOption<Integer> PULSAR_CONNECTIONS_PER_BROKER =
@@ -186,12 +185,12 @@
.withDescription(
Description.builder()
.text(
- "Sets the max number of connection that the client library will open to a single broker.")
+ "The maximum number of connections that the client library will open to a single broker.")
.linebreak()
.text(
- "By default, the connection pool will use a single connection for all the producers and consumers.")
+ " By default, the connection pool will use a single connection for all the producers and consumers.")
.text(
- "Increasing this parameter may improve throughput when using many producers over a high latency connection.")
+ " Increasing this parameter may improve throughput when using many producers over a high latency connection.")
.build());
public static final ConfigOption<Boolean> PULSAR_USE_TCP_NO_DELAY =
@@ -201,18 +200,18 @@
.withDescription(
Description.builder()
.text(
- "Whether to use TCP no-delay flag on the connection to disable Nagle algorithm.")
+ "Whether to use the TCP no-delay flag on the connection to disable Nagle algorithm.")
.linebreak()
.text(
- "No-delay features make sure packets are sent out on the network as soon as possible,")
- .text("and it's critical to achieve low latency publishes.")
+ "No-delay features ensures that packets are sent out on the network as soon as possible,")
+ .text(" and it is critical to achieve low latency publishes.")
.text(
- "On the other hand, sending out a huge number of small packets might limit the overall throughput,")
+ " On the other hand, sending out a huge number of small packets might limit the overall throughput.")
.text(
- "so if latency is not a concern, it's advisable to set the %s flag to false.",
- code("useTcpNoDelay"))
+ " Therefore, if latency is not a concern, it is recommended to set the %s flag to %s.",
+ code("useTcpNoDelay"), code("false"))
.linebreak()
- .text("Default value is true.")
+ .text("By default, it is set to %s.", code("true"))
.build());
public static final ConfigOption<String> PULSAR_TLS_TRUST_CERTS_FILE_PATH =
@@ -226,7 +225,7 @@
.booleanType()
.defaultValue(false)
.withDescription(
- "Whether the Pulsar client accepts untrusted TLS certificate from broker.");
+ "Whether the Pulsar client accepts untrusted TLS certificate from the broker.");
public static final ConfigOption<Boolean> PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "tlsHostnameVerificationEnable")
@@ -236,11 +235,11 @@
Description.builder()
.text("Whether to enable TLS hostname verification.")
.text(
- "It allows to validate hostname verification when client connects to broker over tls.")
+ " It allows to validate hostname verification when a client connects to the broker over TLS.")
.text(
- "It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected broker's host name.")
+ " It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker's host name.")
.text(
- "It follows RFC 2818, 3.1. Server Identity hostname verification.")
+ " It follows RFC 2818, 3.1. Server Identity hostname verification.")
.build());
public static final ConfigOption<Integer> PULSAR_CONCURRENT_LOOKUP_REQUEST =
@@ -250,10 +249,9 @@
.withDescription(
Description.builder()
.text(
- "The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker.")
+ "The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker.")
.text(
- "It should be configured with higher value only in case of it requires"
- + " to produce/subscribe on thousands of topic using created %s",
+ " It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created %s",
code("PulsarClient"))
.build());
@@ -264,12 +262,17 @@
.withDescription(
Description.builder()
.text(
- "The maximum number of lookup requests allowed on each broker connection to prevent overload on broker.")
- .text("It should be bigger than maxConcurrentLookupRequests.")
+ "The maximum number of lookup requests allowed on each broker connection to prevent overload on the broker.")
.text(
- "Requests that inside maxConcurrentLookupRequests already send to broker,")
+ " It should be greater than %s.",
+ code("maxConcurrentLookupRequests"))
.text(
- "and requests beyond maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.")
+ " Requests that inside %s are already sent to broker,",
+ code("maxConcurrentLookupRequests"))
+ .text(
+ " and requests beyond %s and under %s will wait in each client cnx.",
+ code("maxConcurrentLookupRequests"),
+ code("maxLookupRequests"))
.build());
public static final ConfigOption<Integer> PULSAR_MAX_LOOKUP_REDIRECTS =
@@ -277,23 +280,26 @@
.intType()
.defaultValue(20)
.withDescription(
- "Set the maximum number of times a lookup-request to a broker will be redirected.");
+ "The maximum number of times a lookup-request redirections to a broker.");
public static final ConfigOption<Integer> PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxNumberOfRejectedRequestPerConnection")
.intType()
.defaultValue(50)
.withDescription(
- "The maximum number of rejected requests of a broker in a certain time"
- + " frame (30 seconds) after the current connection is closed and"
- + " the client creates a new connection to connect to a different broker.");
+ Description.builder()
+ .text(
+ "The maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed")
+ .text(
+ " and the client creates a new connection to connect to a different broker.")
+ .build());
public static final ConfigOption<Integer> PULSAR_KEEP_ALIVE_INTERVAL_SECONDS =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "keepAliveIntervalSeconds")
.intType()
.defaultValue(30)
.withDescription(
- "Seconds of keeping alive interval for each client broker connection.");
+ "Interval (in seconds) for keeping connection between the Pulsar client and broker alive.");
public static final ConfigOption<Integer> PULSAR_CONNECTION_TIMEOUT_MS =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "connectionTimeoutMs")
@@ -302,18 +308,18 @@
.withDescription(
Description.builder()
.text(
- "Duration (in millis) of waiting for a connection to a broker to be established.")
+ "Duration (in ms) of waiting for a connection to a broker to be established.")
.linebreak()
.text(
"If the duration passes without a response from a broker, the connection attempt is dropped.")
.build());
- // TODO This option would be exposed by Pulsar's ClientBuilder in next release.
+ // TODO This option would be exposed by Pulsar's ClientBuilder in the next Pulsar release.
public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT_MS =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "requestTimeoutMs")
.intType()
.defaultValue(60000)
- .withDescription("Maximum duration (in millis) for completing a request.");
+ .withDescription("Maximum duration (in ms) for completing a request.");
public static final ConfigOption<Long> PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "initialBackoffIntervalNanos")
@@ -325,7 +331,8 @@
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxBackoffIntervalNanos")
.longType()
.defaultValue(SECONDS.toNanos(60))
- .withDescription("Maximum duration (in nanoseconds) for a backoff interval.");
+ .withDescription(
+ "The maximum duration (in nanoseconds) for a backoff interval.");
public static final ConfigOption<Boolean> PULSAR_ENABLE_BUSY_WAIT =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "enableBusyWait")
@@ -338,9 +345,9 @@
.text(
"This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches.")
.text(
- "The spinning will consume 100% CPU even when the broker is not doing any work.")
+ " The spinning will consume 100% CPU even when the broker is not doing any work.")
.text(
- "It is recommended to reduce the number of IO threads and BK client threads to only have few CPU cores busy.")
+ " It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.")
.build());
public static final ConfigOption<String> PULSAR_LISTENER_NAME =
@@ -350,8 +357,8 @@
.withDescription(
Description.builder()
.text(
- "Configure the listenerName that the broker will return the corresponding %s.",
- code("advertisedListener"))
+ "Configure the %s that the broker will return the corresponding %s.",
+ code("listenerName"), code("advertisedListener"))
.build());
public static final ConfigOption<Boolean> PULSAR_USE_KEY_STORE_TLS =
@@ -361,8 +368,10 @@
.withDescription(
Description.builder()
.text(
- "If Tls is enabled, whether use KeyStore type as tls configuration parameter.")
- .text("False means use default pem type configuration.")
+ "If TLS is enabled, whether use the KeyStore type as the TLS configuration parameter.")
+ .text(
+ " If it is set to %s, it means to use the default pem type configuration.",
+ code("false"))
.build());
public static final ConfigOption<String> PULSAR_SSL_PROVIDER =
@@ -374,7 +383,7 @@
.text(
"The name of the security provider used for SSL connections.")
.text(
- "Default value is the default security provider of the JVM.")
+ " The default value is the default security provider of the JVM.")
.build());
public static final ConfigOption<String> PULSAR_TLS_TRUST_STORE_TYPE =
@@ -405,11 +414,11 @@
Description.builder()
.text("A list of cipher suites.")
.text(
- "This is a named combination of authentication, encryption,"
- + " MAC and key exchange algorithm used to negotiate the security"
- + " settings for a network connection using TLS or SSL network protocol.")
+ " This is a named combination of authentication, encryption,")
.text(
- "By default all the available cipher suites are supported.")
+ " MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol.")
+ .text(
+ " By default all the available cipher suites are supported.")
.build());
public static final ConfigOption<List<String>> PULSAR_TLS_PROTOCOLS =
@@ -420,9 +429,10 @@
.withDescription(
Description.builder()
.text("The SSL protocol used to generate the SSLContext.")
- .text("Default setting is TLS, which is fine for most cases.")
.text(
- "Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.")
+ " By default, it is set TLS, which is fine for most cases.")
+ .text(
+ " Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.")
.build());
public static final ConfigOption<Long> PULSAR_MEMORY_LIMIT_BYTES =
@@ -432,11 +442,11 @@
.withDescription(
Description.builder()
.text(
- "Configure a limit on the amount of direct memory that will be allocated by this client instance. Its unit is byte.")
+ "The limit (in bytes) on the amount of direct memory that will be allocated by this client instance.")
.linebreak()
.text(
"Note: at this moment this is only limiting the memory for producers.")
- .text("Setting this to 0 will disable the limit.")
+ .text(" Setting this to %s will disable the limit.", code("0"))
.build());
public static final ConfigOption<String> PULSAR_PROXY_SERVICE_URL =
@@ -444,8 +454,11 @@
.stringType()
.noDefaultValue()
.withDescription(
- "Proxy-service url when client would like to connect to broker via proxy."
- + " Client can choose type of proxy-routing.");
+ Description.builder()
+ .text(
+ "Proxy-service URL when a client connects to the broker via the proxy.")
+ .text(" The client can choose the type of proxy-routing.")
+ .build());
public static final ConfigOption<ProxyProtocol> PULSAR_PROXY_PROTOCOL =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "proxyProtocol")
@@ -454,8 +467,8 @@
.withDescription(
Description.builder()
.text(
- "Protocol type to determine type of proxy routing when client connects to proxy using %s.",
- code(CLIENT_CONFIG_PREFIX + "proxyServiceUrl"))
+ "Protocol type to determine the type of proxy routing when a client connects to the proxy using %s.",
+ code("pulsar.client.proxyServiceUrl"))
.build());
public static final ConfigOption<Boolean> PULSAR_ENABLE_TRANSACTION =
@@ -463,7 +476,12 @@
.booleanType()
.defaultValue(false)
.withDescription(
- "If enable transaction, start the transactionCoordinatorClient with pulsar client.");
+ Description.builder()
+ .text(
+ "If transaction is enabled, start the %s with %s.",
+ code("transactionCoordinatorClient"),
+ code("PulsarClient"))
+ .build());
///////////////////////////////////////////////////////////////////////////////
//
@@ -479,7 +497,7 @@
.withDescription(
Description.builder()
.text(
- "Set the Pulsar service HTTP URL for the admin endpoint. eg. %s, or %s for TLS.",
+ "The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.",
code("http://my-broker.example.com:8080"),
code("https://my-broker.example.com:8443"))
.build());
@@ -488,27 +506,26 @@
ConfigOptions.key(ADMIN_CONFIG_PREFIX + "connectTimeout")
.intType()
.defaultValue(60000)
- .withDescription(
- "This sets the connection time out (in millis) for the pulsar admin client.");
+ .withDescription("The connection time out (in ms) for the PulsarAdmin client.");
public static final ConfigOption<Integer> PULSAR_READ_TIMEOUT =
ConfigOptions.key(ADMIN_CONFIG_PREFIX + "readTimeout")
.intType()
.defaultValue(60000)
.withDescription(
- "This sets the server response read time out (in millis) for the pulsar admin client for any request.");
+ "The server response read timeout (in ms) for the PulsarAdmin client for any request.");
public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT =
ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestTimeout")
.intType()
.defaultValue(300000)
.withDescription(
- "This sets the server request time out (in millis) for the pulsar admin client for any request.");
+ "The server request timeout (in ms) for the PulsarAdmin client for any request.");
public static final ConfigOption<Integer> PULSAR_AUTO_CERT_REFRESH_TIME =
ConfigOptions.key(ADMIN_CONFIG_PREFIX + "autoCertRefreshTime")
.intType()
.defaultValue(300000)
.withDescription(
- "This sets auto cert refresh time (in millis) if Pulsar admin uses tls authentication.");
+ "The auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.");
}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 2884f5e..c319915 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -55,6 +55,7 @@
@ConfigGroup(name = "PulsarSource", keyPrefix = SOURCE_CONFIG_PREFIX),
@ConfigGroup(name = "PulsarConsumer", keyPrefix = CONSUMER_CONFIG_PREFIX)
})
+@SuppressWarnings("java:S1192")
public final class PulsarSourceOptions {
// Pulsar source connector config prefix.
@@ -78,8 +79,11 @@
.longType()
.defaultValue(Duration.ofSeconds(30).toMillis())
.withDescription(
- "The interval in milliseconds for the Pulsar source to discover "
- + "the new partitions. A non-positive value disables the partition discovery.");
+ Description.builder()
+ .text(
+ "The interval (in ms) for the Pulsar source to discover the new partitions.")
+ .text(" A non-positive value disables the partition discovery.")
+ .build());
public static final ConfigOption<Boolean> PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableAutoAcknowledgeMessage")
@@ -89,18 +93,21 @@
Description.builder()
.text(
"Flink commits the consuming position with pulsar transactions on checkpoint.")
- .linebreak()
.text(
- "However, if you have disabled the flink checkpoint or your pulsar cluster disabled the transaction,"
- + " make sure you have set this option to %s.",
+ " However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster,")
+ .text(
+ " ensure that you have set this option to %s.",
code("true"))
+ .linebreak()
.text(
"The source would use pulsar client's internal mechanism and commit cursor in two ways.")
.list(
text(
- "For Key_Shared and Shared subscription: the cursor would be committed once the message is consumed."),
+ "For %s and %s subscription, the cursor would be committed once the message is consumed.",
+ code("Key_Shared"), code("Shared")),
text(
- "For Exclusive and Failover subscription: the cursor would be committed in a fixed interval."))
+ "For %s and %s subscription, the cursor would be committed in a given interval.",
+ code("Exclusive"), code("Failover")))
.build());
public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
@@ -110,9 +117,9 @@
.withDescription(
Description.builder()
.text(
- "This option is used only when user disabled checkpoint and using Exclusive or Failover subscription.")
+ "This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription.")
.text(
- "We would automatically commit the cursor using the given period (in millis).")
+ " We would automatically commit the cursor using the given period (in ms).")
.build());
public static final ConfigOption<Long> PULSAR_TRANSACTION_TIMEOUT_MILLIS =
@@ -122,13 +129,14 @@
.withDescription(
Description.builder()
.text(
- "This option is used for when using Shared or Key_Shared subscription."
- + " You should set this option when you didn't enable the %s option.",
+ "This option is used in %s or %s subscription.",
+ code("Shared"), code("Key_Shared"))
+ .text(
+ " You should configure this option when you do not enable the %s option.",
code("pulsar.source.enableAutoAcknowledgeMessage"))
.linebreak()
.text(
- "This value should be greater than the checkpoint interval.")
- .text("It uses milliseconds as the unit of time.")
+ "The value (in ms) should be greater than the checkpoint interval.")
.build());
public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
@@ -137,10 +145,10 @@
.defaultValue(Duration.ofSeconds(10).toMillis())
.withDescription(
Description.builder()
+ .text("The maximum time (in ms) to wait when fetching records.")
+ .text(" A longer time increases throughput but also latency.")
.text(
- "The max time (in millis) to wait when fetching records. "
- + "A longer time increases throughput but also latency. "
- + "A fetch batch might be finished earlier because of %s.",
+ " A fetch batch might be finished earlier because of %s.",
code("pulsar.source.maxFetchRecords"))
.build());
@@ -151,9 +159,10 @@
.withDescription(
Description.builder()
.text(
- "The max number of records to fetch to wait when polling. "
- + "A longer time increases throughput but also latency."
- + "A fetch batch might be finished earlier because of %s.",
+ "The maximum number of records to fetch to wait when polling.")
+ .text(" A longer time increases throughput but also latency.")
+ .text(
+ " A fetch batch might be finished earlier because of %s.",
code("pulsar.source.maxFetchTime"))
.build());
@@ -162,9 +171,14 @@
.enumType(CursorVerification.class)
.defaultValue(CursorVerification.WARN_ON_MISMATCH)
.withDescription(
- "Upon (re)starting the source checks whether the expected message can be read. "
- + "If failure is enabled the application fails, else it logs a warning. "
- + "A possible solution is to adjust the retention settings in pulsar or ignoring the check result.");
+ Description.builder()
+ .text(
+ "Upon (re)starting the source, check whether the expected message can be read.")
+ .text(
+ " If failure is enabled, the application fails. Otherwise, it logs a warning.")
+ .text(
+ " A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.")
+ .build());
///////////////////////////////////////////////////////////////////////////////
//
@@ -178,8 +192,11 @@
.stringType()
.noDefaultValue()
.withDescription(
- "Specify the subscription name for this consumer."
- + " This argument is required when constructing the consumer.");
+ Description.builder()
+ .text("Specify the subscription name for this consumer.")
+ .text(
+ " This argument is required when constructing the consumer.")
+ .build());
public static final ConfigOption<SubscriptionType> PULSAR_SUBSCRIPTION_TYPE =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType")
@@ -238,16 +255,15 @@
.withDescription(
Description.builder()
.text(
- "Group a consumer acknowledgment for a specified time (in microseconds).")
- .linebreak()
+ "Group a consumer acknowledgment for a specified time (in μs).")
.text(
- "By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.")
- .linebreak()
+ " By default, a consumer uses %s grouping time to send out acknowledgments to a broker.",
+ code("100μs"))
.text(
- "Setting a group time of 0 sends out acknowledgments immediately.")
- .linebreak()
+ " If the group time is set to %s, acknowledgments are sent out immediately.",
+ code("0"))
.text(
- "A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.")
+ " A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.")
.build());
public static final ConfigOption<Long> PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS =
@@ -257,11 +273,11 @@
.withDescription(
Description.builder()
.text(
- "Delay (in microseconds) to wait before redelivering messages that failed to be processed.")
+ "Delay (in μs) to wait before redelivering messages that failed to be processed.")
.linebreak()
.text(
"When an application uses %s, failed messages are redelivered after a fixed timeout.",
- code("Consumer#negativeAcknowledge(Message)"))
+ code("Consumer.negativeAcknowledge(Message)"))
.build());
public static final ConfigOption<Integer>
@@ -274,7 +290,7 @@
.withDescription(
Description.builder()
.text(
- "The max total receiver queue size across partitions.")
+ "The maximum total receiver queue size across partitions.")
.linebreak()
.text(
"This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.")
@@ -285,7 +301,7 @@
.stringType()
.noDefaultValue()
.withDescription(
- "Consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.");
+ "The consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.");
public static final ConfigOption<Long> PULSAR_ACK_TIMEOUT_MILLIS =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "ackTimeoutMillis")
@@ -294,17 +310,13 @@
.withDescription(
Description.builder()
.text(
- "Set the timeout (in millis) for unacknowledged messages, truncated to the nearest millisecond."
- + " The timeout needs to be greater than 1 second.")
+ "The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.")
.linebreak()
.text(
- "By default, the acknowledge timeout is disabled and that means that messages delivered to a"
- + " consumer will not be re-delivered unless the consumer crashes.")
+ "By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.")
.linebreak()
.text(
- "When enabling ack timeout, if a message is not acknowledged within the specified timeout"
- + " it will be re-delivered to the consumer"
- + " (possibly to a different consumer in case of a shared subscription).")
+ "When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).")
.build());
public static final ConfigOption<Long> PULSAR_TICK_DURATION_MILLIS =
@@ -313,10 +325,10 @@
.defaultValue(1000L)
.withDescription(
Description.builder()
- .text("Granularity (in millis) of the ack-timeout redelivery.")
+ .text("Granularity (in ms) of the ack-timeout redelivery.")
.linebreak()
.text(
- "Using an higher %s reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).",
+ "A greater (for example, 1 hour) %s reduces the memory overhead to track messages.",
code("tickDurationMillis"))
.build());
@@ -327,19 +339,21 @@
.withDescription(
Description.builder()
.text(
- "Priority level for a consumer to which a broker gives more priority while dispatching messages in the shared subscription mode.")
+ "Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.")
.linebreak()
.text(
"The broker follows descending priorities. For example, 0=max-priority, 1, 2,...")
.linebreak()
.text(
- "In shared subscription mode, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.")
+ "In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits.")
+ .text(
+ " Otherwise, the broker considers consumers on the next priority level.")
.linebreak()
.linebreak()
.text("Example 1")
.linebreak()
.text(
- "If a subscription has consumerA with %s 0 and consumerB with %s 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.",
+ "If a subscription has consumer A with %s 0 and consumer B with %s 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.",
code("priorityLevel"), code("priorityLevel"))
.linebreak()
.text("Example 2")
@@ -353,7 +367,7 @@
+ "C5, 1, 1\n")
.linebreak()
.text(
- "Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.")
+ "The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.")
.build());
public static final ConfigOption<Integer> PULSAR_MAX_PENDING_CHUNKED_MESSAGE =
@@ -363,32 +377,28 @@
.withDescription(
Description.builder()
.text(
- "Consumer buffers chunk messages into memory until it receives all the chunks of the original message.")
+ "The consumer buffers chunk messages into memory until it receives all the chunks of the original message.")
.text(
- "While consuming chunk-messages, chunks from same message might not be contiguous"
- + " in the stream and they might be mixed with other messages' chunks.")
+ " While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages' chunks.")
.text(
- "So, consumer has to maintain multiple buffers to manage chunks coming from different messages.")
+ " So, consumer has to maintain multiple buffers to manage chunks coming from different messages.")
.text(
- "This mainly happens when multiple publishers are publishing messages on the topic"
- + " concurrently or publisher failed to publish all chunks of the messages.")
- .linebreak()
- .text("eg: M1-C1, M2-C1, M1-C2, M2-C2")
- .text(
- "Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.")
+ " This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.")
.linebreak()
.text(
- "Buffering large number of outstanding uncompleted chunked messages can create memory"
- + " pressure and it can be guarded by providing this %s threshold."
- + " Once, consumer reaches this threshold, it drops the outstanding unchunked-messages"
- + " by silently acking or asking broker to redeliver later by marking it unacked."
- + " This behavior can be controlled by configuration %s",
+ "For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.")
+ .text(
+ "Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.")
+ .linebreak()
+ .text(
+ "Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this %s threshold.",
+ code("pulsar.consumer.maxPendingChunkedMessage"))
+ .text(
+ " Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged.")
+ .text(
+ " This behavior can be controlled by the %s option.",
code(
- CONSUMER_CONFIG_PREFIX
- + "maxPendingChunkedMessage"),
- code(
- CONSUMER_CONFIG_PREFIX
- + "autoAckOldestChunkedMessageOnQueueFull"))
+ "pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull"))
.build());
public static final ConfigOption<Boolean> PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
@@ -398,16 +408,12 @@
.withDescription(
Description.builder()
.text(
- "Buffering large number of outstanding uncompleted chunked messages can create memory pressure"
- + " and it can be guarded by providing this %s threshold."
- + " Once, consumer reaches this threshold, it drops the outstanding unchunked-messages"
- + " by silently acking if %s is true else it marks them for redelivery.",
+ "Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this %s threshold.",
+ code("pulsar.consumer.maxPendingChunkedMessage"))
+ .text(
+ " Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if %s is true. Otherwise, it marks them for redelivery.",
code(
- CONSUMER_CONFIG_PREFIX
- + "maxPendingChunkedMessage"),
- code(
- CONSUMER_CONFIG_PREFIX
- + "autoAckOldestChunkedMessageOnQueueFull"))
+ "pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull"))
.build());
public static final ConfigOption<Long> PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS =
@@ -415,10 +421,12 @@
.longType()
.defaultValue(60 * 1000L)
.withDescription(
- "If producer fails to publish all the chunks of a message then consumer"
- + " can expire incomplete chunks if consumer won't be able to"
- + " receive all chunks in expire times (default 1 hour)."
- + " It uses milliseconds as the unit of time.");
+ Description.builder()
+ .text(
+ "If a producer fails to publish all the chunks of a message,")
+ .text(
+ " the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).")
+ .build());
public static final ConfigOption<ConsumerCryptoFailureAction> PULSAR_CRYPTO_FAILURE_ACTION =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "cryptoFailureAction")
@@ -427,23 +435,28 @@
.withDescription(
Description.builder()
.text(
- "Consumer should take action when it receives a message that can not be decrypted.")
+ "The consumer should take action when it receives a message that can not be decrypted.")
.list(
text(
- "FAIL: this is the default option to fail messages until crypto succeeds."),
+ "%s: this is the default option to fail messages until crypto succeeds.",
+ code("FAIL")),
text(
- "DISCARD: silently acknowledge and not deliver message to an application."),
+ "%s: silently acknowledge but do not deliver messages to an application.",
+ code("DISCARD")),
text(
- "CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message."))
+ "%s: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.",
+ code("CONSUME")))
.linebreak()
- .text("The decompression of message fails.")
+ .text("Fail to decompress the messages.")
.linebreak()
.text(
"If messages contain batch messages, a client is not be able to retrieve individual messages in batch.")
.linebreak()
.text(
- "Delivered encrypted message contains %s which contains encryption and compression information in it using which application can decrypt consumed message payload.",
+ "The delivered encrypted message contains %s which contains encryption and compression information in.",
code("EncryptionContext"))
+ .text(
+ " You can use an application to decrypt the consumed message payload.")
.build());
public static final ConfigOption<Map<String, String>> PULSAR_CONSUMER_PROPERTIES =
@@ -453,17 +466,15 @@
.withDescription(
Description.builder()
.text("A name or value property of this consumer.")
- .linebreak()
.text(
- "%s is application defined metadata attached to a consumer.",
+ " %s is application defined metadata attached to a consumer.",
code("properties"))
- .linebreak()
.text(
- "When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
+ " When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
.build());
public static final ConfigOption<Boolean> PULSAR_READ_COMPACTED =
- ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted") // NOSONAR
+ ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted")
.booleanType()
.defaultValue(false)
.withDescription(
@@ -524,47 +535,31 @@
.intType()
.defaultValue(0)
.withDescription(
- "Maximum number of times that a message will be redelivered before being sent to the dead letter queue.");
+ "The maximum number of times that a message are redelivered before being sent to the dead letter queue.");
public static final ConfigOption<String> PULSAR_RETRY_LETTER_TOPIC =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "deadLetterPolicy.retryLetterTopic")
.stringType()
.noDefaultValue()
- .withDescription(
- "Name of the retry topic where the failing messages will be sent.");
+ .withDescription("Name of the retry topic where the failed messages are sent.");
public static final ConfigOption<String> PULSAR_DEAD_LETTER_TOPIC =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "deadLetterPolicy.deadLetterTopic")
.stringType()
.noDefaultValue()
- .withDescription(
- "Name of the dead topic where the failing messages will be sent.");
+ .withDescription("Name of the dead topic where the failed messages are sent.");
public static final ConfigOption<Boolean> PULSAR_RETRY_ENABLE =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "retryEnable")
.booleanType()
.defaultValue(false)
- .withDescription("If enabled, the consumer will auto retry messages.");
-
- public static final ConfigOption<Boolean> PULSAR_AUTO_UPDATE_PARTITIONS =
- ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitions")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- Description.builder()
- .text(
- "If %s is enabled, a consumer subscribes to partition increase automatically.",
- code("autoUpdatePartitions"))
- .linebreak()
- .text("Note: this is only for partitioned consumers.\t")
- .build());
+ .withDescription("If enabled, the consumer will automatically retry messages.");
public static final ConfigOption<Integer> PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitionsIntervalSeconds")
.intType()
.defaultValue(60)
.withDescription(
- "Set the interval (in seconds) of updating partitions."
- + " This only works if autoUpdatePartitions is enabled.");
+ "The interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled.");
public static final ConfigOption<Boolean> PULSAR_REPLICATE_SUBSCRIPTION_STATE =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "replicateSubscriptionState")
@@ -582,23 +577,11 @@
.booleanType()
.defaultValue(false)
.withDescription(
- "Ack will return receipt but does not mean that the message will not be resent after get receipt.");
+ "Acknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.");
public static final ConfigOption<Boolean> PULSAR_POOL_MESSAGES =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "poolMessages")
.booleanType()
.defaultValue(false)
- .withDescription(
- Description.builder()
- .text(
- "Enable pooling of messages and the underlying data buffers.")
- .linebreak()
- .text(
- "When pooling is enabled, the application is responsible for calling"
- + " %s after the handling of every received message. If %s"
- + " is not called on a received message, there will be a memory leak."
- + " If an application attempts to use and already \"released\" message,"
- + " it might experience undefined behavior (eg: memory corruption, deserialization error, etc.).",
- code("Message.release()"), code("release()"))
- .build());
+ .withDescription("Enable pooling of messages and the underlying data buffers.");
}