[FLINK-23864][docs] Add flink-connector-pulsar module to flink-docs, auto generate the config document.
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
index 4c33a70..db90bf1 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
@@ -31,7 +31,6 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static java.util.Collections.emptyMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -48,6 +47,7 @@
             @ConfigGroup(name = "PulsarClient", keyPrefix = CLIENT_CONFIG_PREFIX),
             @ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX)
         })
+@SuppressWarnings("java:S1192")
 public final class PulsarOptions {
 
     // Pulsar client API config prefix.
@@ -112,8 +112,7 @@
                     .noDefaultValue()
                     .withDescription(
                             Description.builder()
-                                    .text(
-                                            "String represents parameters for the authentication plugin.")
+                                    .text("Parameters for the authentication plugin.")
                                     .linebreak()
                                     .linebreak()
                                     .text("Example:")
@@ -124,9 +123,8 @@
     public static final ConfigOption<Map<String, String>> PULSAR_AUTH_PARAM_MAP =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap")
                     .mapType()
-                    .defaultValue(emptyMap())
-                    .withDescription(
-                            "Map which represents parameters for the authentication plugin.");
+                    .noDefaultValue()
+                    .withDescription("Parameters for the authentication plugin.");
 
     public static final ConfigOption<Integer> PULSAR_OPERATION_TIMEOUT_MS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "operationTimeoutMs")
@@ -134,10 +132,11 @@
                     .defaultValue(30000)
                     .withDescription(
                             Description.builder()
-                                    .text("Operation timeout (in millis).")
+                                    .text("Operation timeout (in ms).")
                                     .text(
-                                            "Producer-create, subscribe and unsubscribe operations will be retried until this interval,"
-                                                    + " after which the operation will be marked as failed.")
+                                            " Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval.")
+                                    .text(
+                                            " If the operation is not completed during this interval, the operation will be marked as failed.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_STATS_INTERVAL_SECONDS =
@@ -153,7 +152,7 @@
                                                     "Stats is activated with positive %s",
                                                     code("statsInterval")),
                                             text(
-                                                    "Set %s to 1 second at least",
+                                                    "Set %s to 1 second at least.",
                                                     code("statsIntervalSeconds")))
                                     .build());
 
@@ -173,10 +172,10 @@
                                     .text(
                                             "The number of threads used for handling message listeners.")
                                     .text(
-                                            "The listener thread pool is shared across all the consumers and readers that are using a %s model to get messages.",
+                                            " The listener thread pool is shared across all the consumers and readers that are using a %s model to get messages.",
                                             code("listener"))
                                     .text(
-                                            "For a given consumer, the listener will be always invoked from the same thread, to ensure ordering.")
+                                            " For a given consumer, the listener is always invoked from the same thread to ensure ordering.")
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_CONNECTIONS_PER_BROKER =
@@ -186,12 +185,12 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Sets the max number of connection that the client library will open to a single broker.")
+                                            "The maximum number of connections that the client library will open to a single broker.")
                                     .linebreak()
                                     .text(
-                                            "By default, the connection pool will use a single connection for all the producers and consumers.")
+                                            " By default, the connection pool will use a single connection for all the producers and consumers.")
                                     .text(
-                                            "Increasing this parameter may improve throughput when using many producers over a high latency connection.")
+                                            " Increasing this parameter may improve throughput when using many producers over a high latency connection.")
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_USE_TCP_NO_DELAY =
@@ -201,18 +200,18 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Whether to use TCP no-delay flag on the connection to disable Nagle algorithm.")
+                                            "Whether to use the TCP no-delay flag on the connection to disable Nagle algorithm.")
                                     .linebreak()
                                     .text(
-                                            "No-delay features make sure packets are sent out on the network as soon as possible,")
-                                    .text("and it's critical to achieve low latency publishes.")
+                                            "No-delay features ensures that packets are sent out on the network as soon as possible,")
+                                    .text(" and it is critical to achieve low latency publishes.")
                                     .text(
-                                            "On the other hand, sending out a huge number of small packets might limit the overall throughput,")
+                                            " On the other hand, sending out a huge number of small packets might limit the overall throughput.")
                                     .text(
-                                            "so if latency is not a concern, it's advisable to set the %s flag to false.",
-                                            code("useTcpNoDelay"))
+                                            " Therefore, if latency is not a concern, it is recommended to set the %s flag to %s.",
+                                            code("useTcpNoDelay"), code("false"))
                                     .linebreak()
-                                    .text("Default value is true.")
+                                    .text("By default, it is set to %s.", code("true"))
                                     .build());
 
     public static final ConfigOption<String> PULSAR_TLS_TRUST_CERTS_FILE_PATH =
@@ -226,7 +225,7 @@
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Whether the Pulsar client accepts untrusted TLS certificate from broker.");
+                            "Whether the Pulsar client accepts untrusted TLS certificate from the broker.");
 
     public static final ConfigOption<Boolean> PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "tlsHostnameVerificationEnable")
@@ -236,11 +235,11 @@
                             Description.builder()
                                     .text("Whether to enable TLS hostname verification.")
                                     .text(
-                                            "It allows to validate hostname verification when client connects to broker over tls.")
+                                            " It allows to validate hostname verification when a client connects to the broker over TLS.")
                                     .text(
-                                            "It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected broker's host name.")
+                                            " It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker's host name.")
                                     .text(
-                                            "It follows RFC 2818, 3.1. Server Identity hostname verification.")
+                                            " It follows RFC 2818, 3.1. Server Identity hostname verification.")
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_CONCURRENT_LOOKUP_REQUEST =
@@ -250,10 +249,9 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker.")
+                                            "The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker.")
                                     .text(
-                                            "It should be configured with higher value only in case of it requires"
-                                                    + " to produce/subscribe on thousands of topic using created %s",
+                                            " It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created %s",
                                             code("PulsarClient"))
                                     .build());
 
@@ -264,12 +262,17 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The maximum number of lookup requests allowed on each broker connection to prevent overload on broker.")
-                                    .text("It should be bigger than maxConcurrentLookupRequests.")
+                                            "The maximum number of lookup requests allowed on each broker connection to prevent overload on the broker.")
                                     .text(
-                                            "Requests that inside maxConcurrentLookupRequests already send to broker,")
+                                            " It should be greater than %s.",
+                                            code("maxConcurrentLookupRequests"))
                                     .text(
-                                            "and requests beyond maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.")
+                                            " Requests that inside %s are already sent to broker,",
+                                            code("maxConcurrentLookupRequests"))
+                                    .text(
+                                            " and requests beyond %s and under %s will wait in each client cnx.",
+                                            code("maxConcurrentLookupRequests"),
+                                            code("maxLookupRequests"))
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_MAX_LOOKUP_REDIRECTS =
@@ -277,23 +280,26 @@
                     .intType()
                     .defaultValue(20)
                     .withDescription(
-                            "Set the maximum number of times a lookup-request to a broker will be redirected.");
+                            "The maximum number of times a lookup-request redirections to a broker.");
 
     public static final ConfigOption<Integer> PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxNumberOfRejectedRequestPerConnection")
                     .intType()
                     .defaultValue(50)
                     .withDescription(
-                            "The maximum number of rejected requests of a broker in a certain time"
-                                    + " frame (30 seconds) after the current connection is closed and"
-                                    + " the client creates a new connection to connect to a different broker.");
+                            Description.builder()
+                                    .text(
+                                            "The maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed")
+                                    .text(
+                                            " and the client creates a new connection to connect to a different broker.")
+                                    .build());
 
     public static final ConfigOption<Integer> PULSAR_KEEP_ALIVE_INTERVAL_SECONDS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "keepAliveIntervalSeconds")
                     .intType()
                     .defaultValue(30)
                     .withDescription(
-                            "Seconds of keeping alive interval for each client broker connection.");
+                            "Interval (in seconds) for keeping connection between the Pulsar client and broker alive.");
 
     public static final ConfigOption<Integer> PULSAR_CONNECTION_TIMEOUT_MS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "connectionTimeoutMs")
@@ -302,18 +308,18 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Duration (in millis) of waiting for a connection to a broker to be established.")
+                                            "Duration (in ms) of waiting for a connection to a broker to be established.")
                                     .linebreak()
                                     .text(
                                             "If the duration passes without a response from a broker, the connection attempt is dropped.")
                                     .build());
 
-    // TODO This option would be exposed by Pulsar's ClientBuilder in next release.
+    // TODO This option would be exposed by Pulsar's ClientBuilder in the next Pulsar release.
     public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT_MS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "requestTimeoutMs")
                     .intType()
                     .defaultValue(60000)
-                    .withDescription("Maximum duration (in millis) for completing a request.");
+                    .withDescription("Maximum duration (in ms) for completing a request.");
 
     public static final ConfigOption<Long> PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "initialBackoffIntervalNanos")
@@ -325,7 +331,8 @@
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxBackoffIntervalNanos")
                     .longType()
                     .defaultValue(SECONDS.toNanos(60))
-                    .withDescription("Maximum duration (in nanoseconds) for a backoff interval.");
+                    .withDescription(
+                            "The maximum duration (in nanoseconds) for a backoff interval.");
 
     public static final ConfigOption<Boolean> PULSAR_ENABLE_BUSY_WAIT =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "enableBusyWait")
@@ -338,9 +345,9 @@
                                     .text(
                                             "This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches.")
                                     .text(
-                                            "The spinning will consume 100% CPU even when the broker is not doing any work.")
+                                            " The spinning will consume 100% CPU even when the broker is not doing any work.")
                                     .text(
-                                            "It is recommended to reduce the number of IO threads and BK client threads to only have few CPU cores busy.")
+                                            " It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.")
                                     .build());
 
     public static final ConfigOption<String> PULSAR_LISTENER_NAME =
@@ -350,8 +357,8 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Configure the listenerName that the broker will return the corresponding %s.",
-                                            code("advertisedListener"))
+                                            "Configure the %s that the broker will return the corresponding %s.",
+                                            code("listenerName"), code("advertisedListener"))
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_USE_KEY_STORE_TLS =
@@ -361,8 +368,10 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "If Tls is enabled, whether use KeyStore type as tls configuration parameter.")
-                                    .text("False means use default pem type configuration.")
+                                            "If TLS is enabled, whether use the KeyStore type as the TLS configuration parameter.")
+                                    .text(
+                                            " If it is set to %s, it means to use the default pem type configuration.",
+                                            code("false"))
                                     .build());
 
     public static final ConfigOption<String> PULSAR_SSL_PROVIDER =
@@ -374,7 +383,7 @@
                                     .text(
                                             "The name of the security provider used for SSL connections.")
                                     .text(
-                                            "Default value is the default security provider of the JVM.")
+                                            " The default value is the default security provider of the JVM.")
                                     .build());
 
     public static final ConfigOption<String> PULSAR_TLS_TRUST_STORE_TYPE =
@@ -405,11 +414,11 @@
                             Description.builder()
                                     .text("A list of cipher suites.")
                                     .text(
-                                            "This is a named combination of authentication, encryption,"
-                                                    + " MAC and key exchange algorithm used to negotiate the security"
-                                                    + " settings for a network connection using TLS or SSL network protocol.")
+                                            " This is a named combination of authentication, encryption,")
                                     .text(
-                                            "By default all the available cipher suites are supported.")
+                                            " MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol.")
+                                    .text(
+                                            " By default all the available cipher suites are supported.")
                                     .build());
 
     public static final ConfigOption<List<String>> PULSAR_TLS_PROTOCOLS =
@@ -420,9 +429,10 @@
                     .withDescription(
                             Description.builder()
                                     .text("The SSL protocol used to generate the SSLContext.")
-                                    .text("Default setting is TLS, which is fine for most cases.")
                                     .text(
-                                            "Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.")
+                                            " By default, it is set TLS, which is fine for most cases.")
+                                    .text(
+                                            " Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_MEMORY_LIMIT_BYTES =
@@ -432,11 +442,11 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Configure a limit on the amount of direct memory that will be allocated by this client instance. Its unit is byte.")
+                                            "The limit (in bytes) on the amount of direct memory that will be allocated by this client instance.")
                                     .linebreak()
                                     .text(
                                             "Note: at this moment this is only limiting the memory for producers.")
-                                    .text("Setting this to 0 will disable the limit.")
+                                    .text(" Setting this to %s will disable the limit.", code("0"))
                                     .build());
 
     public static final ConfigOption<String> PULSAR_PROXY_SERVICE_URL =
@@ -444,8 +454,11 @@
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Proxy-service url when client would like to connect to broker via proxy."
-                                    + " Client can choose type of proxy-routing.");
+                            Description.builder()
+                                    .text(
+                                            "Proxy-service URL when a client connects to the broker via the proxy.")
+                                    .text(" The client can choose the type of proxy-routing.")
+                                    .build());
 
     public static final ConfigOption<ProxyProtocol> PULSAR_PROXY_PROTOCOL =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "proxyProtocol")
@@ -454,8 +467,8 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Protocol type to determine type of proxy routing when client connects to proxy using %s.",
-                                            code(CLIENT_CONFIG_PREFIX + "proxyServiceUrl"))
+                                            "Protocol type to determine the type of proxy routing when a client connects to the proxy using %s.",
+                                            code("pulsar.client.proxyServiceUrl"))
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_ENABLE_TRANSACTION =
@@ -463,7 +476,12 @@
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "If enable transaction, start the transactionCoordinatorClient with pulsar client.");
+                            Description.builder()
+                                    .text(
+                                            "If transaction is enabled, start the %s with %s.",
+                                            code("transactionCoordinatorClient"),
+                                            code("PulsarClient"))
+                                    .build());
 
     ///////////////////////////////////////////////////////////////////////////////
     //
@@ -479,7 +497,7 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Set the Pulsar service HTTP URL for the admin endpoint. eg. %s, or %s for TLS.",
+                                            "The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.",
                                             code("http://my-broker.example.com:8080"),
                                             code("https://my-broker.example.com:8443"))
                                     .build());
@@ -488,27 +506,26 @@
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "connectTimeout")
                     .intType()
                     .defaultValue(60000)
-                    .withDescription(
-                            "This sets the connection time out (in millis) for the pulsar admin client.");
+                    .withDescription("The connection time out (in ms) for the PulsarAdmin client.");
 
     public static final ConfigOption<Integer> PULSAR_READ_TIMEOUT =
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "readTimeout")
                     .intType()
                     .defaultValue(60000)
                     .withDescription(
-                            "This sets the server response read time out (in millis) for the pulsar admin client for any request.");
+                            "The server response read timeout (in ms) for the PulsarAdmin client for any request.");
 
     public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT =
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestTimeout")
                     .intType()
                     .defaultValue(300000)
                     .withDescription(
-                            "This sets the server request time out (in millis) for the pulsar admin client for any request.");
+                            "The server request timeout (in ms) for the PulsarAdmin client for any request.");
 
     public static final ConfigOption<Integer> PULSAR_AUTO_CERT_REFRESH_TIME =
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "autoCertRefreshTime")
                     .intType()
                     .defaultValue(300000)
                     .withDescription(
-                            "This sets auto cert refresh time (in millis) if Pulsar admin uses tls authentication.");
+                            "The auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.");
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 2884f5e..c319915 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -55,6 +55,7 @@
             @ConfigGroup(name = "PulsarSource", keyPrefix = SOURCE_CONFIG_PREFIX),
             @ConfigGroup(name = "PulsarConsumer", keyPrefix = CONSUMER_CONFIG_PREFIX)
         })
+@SuppressWarnings("java:S1192")
 public final class PulsarSourceOptions {
 
     // Pulsar source connector config prefix.
@@ -78,8 +79,11 @@
                     .longType()
                     .defaultValue(Duration.ofSeconds(30).toMillis())
                     .withDescription(
-                            "The interval in milliseconds for the Pulsar source to discover "
-                                    + "the new partitions. A non-positive value disables the partition discovery.");
+                            Description.builder()
+                                    .text(
+                                            "The interval (in ms) for the Pulsar source to discover the new partitions.")
+                                    .text(" A non-positive value disables the partition discovery.")
+                                    .build());
 
     public static final ConfigOption<Boolean> PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableAutoAcknowledgeMessage")
@@ -89,18 +93,21 @@
                             Description.builder()
                                     .text(
                                             "Flink commits the consuming position with pulsar transactions on checkpoint.")
-                                    .linebreak()
                                     .text(
-                                            "However, if you have disabled the flink checkpoint or your pulsar cluster disabled the transaction,"
-                                                    + " make sure you have set this option to %s.",
+                                            " However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster,")
+                                    .text(
+                                            " ensure that you have set this option to %s.",
                                             code("true"))
+                                    .linebreak()
                                     .text(
                                             "The source would use pulsar client's internal mechanism and commit cursor in two ways.")
                                     .list(
                                             text(
-                                                    "For Key_Shared and Shared subscription: the cursor would be committed once the message is consumed."),
+                                                    "For %s and %s subscription, the cursor would be committed once the message is consumed.",
+                                                    code("Key_Shared"), code("Shared")),
                                             text(
-                                                    "For Exclusive and Failover subscription: the cursor would be committed in a fixed interval."))
+                                                    "For %s and %s subscription, the cursor would be committed in a given interval.",
+                                                    code("Exclusive"), code("Failover")))
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
@@ -110,9 +117,9 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "This option is used only when user disabled checkpoint and using Exclusive or Failover subscription.")
+                                            "This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription.")
                                     .text(
-                                            "We would automatically commit the cursor using the given period (in millis).")
+                                            " We would automatically commit the cursor using the given period (in ms).")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_TRANSACTION_TIMEOUT_MILLIS =
@@ -122,13 +129,14 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "This option is used for when using Shared or Key_Shared subscription."
-                                                    + " You should set this option when you didn't enable the %s option.",
+                                            "This option is used in %s or %s subscription.",
+                                            code("Shared"), code("Key_Shared"))
+                                    .text(
+                                            " You should configure this option when you do not enable the %s option.",
                                             code("pulsar.source.enableAutoAcknowledgeMessage"))
                                     .linebreak()
                                     .text(
-                                            "This value should be greater than the checkpoint interval.")
-                                    .text("It uses milliseconds as the unit of time.")
+                                            "The value (in ms) should be greater than the checkpoint interval.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
@@ -137,10 +145,10 @@
                     .defaultValue(Duration.ofSeconds(10).toMillis())
                     .withDescription(
                             Description.builder()
+                                    .text("The maximum time (in ms) to wait when fetching records.")
+                                    .text(" A longer time increases throughput but also latency.")
                                     .text(
-                                            "The max time (in millis) to wait when fetching records. "
-                                                    + "A longer time increases throughput but also latency. "
-                                                    + "A fetch batch might be finished earlier because of %s.",
+                                            " A fetch batch might be finished earlier because of %s.",
                                             code("pulsar.source.maxFetchRecords"))
                                     .build());
 
@@ -151,9 +159,10 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The max number of records to fetch to wait when polling. "
-                                                    + "A longer time increases throughput but also latency."
-                                                    + "A fetch batch might be finished earlier because of %s.",
+                                            "The maximum number of records to fetch to wait when polling.")
+                                    .text(" A longer time increases throughput but also latency.")
+                                    .text(
+                                            " A fetch batch might be finished earlier because of %s.",
                                             code("pulsar.source.maxFetchTime"))
                                     .build());
 
@@ -162,9 +171,14 @@
                     .enumType(CursorVerification.class)
                     .defaultValue(CursorVerification.WARN_ON_MISMATCH)
                     .withDescription(
-                            "Upon (re)starting the source checks whether the expected message can be read. "
-                                    + "If failure is enabled the application fails, else it logs a warning. "
-                                    + "A possible solution is to adjust the retention settings in pulsar or ignoring the check result.");
+                            Description.builder()
+                                    .text(
+                                            "Upon (re)starting the source, check whether the expected message can be read.")
+                                    .text(
+                                            " If failure is enabled, the application fails. Otherwise, it logs a warning.")
+                                    .text(
+                                            " A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.")
+                                    .build());
 
     ///////////////////////////////////////////////////////////////////////////////
     //
@@ -178,8 +192,11 @@
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Specify the subscription name for this consumer."
-                                    + " This argument is required when constructing the consumer.");
+                            Description.builder()
+                                    .text("Specify the subscription name for this consumer.")
+                                    .text(
+                                            " This argument is required when constructing the consumer.")
+                                    .build());
 
     public static final ConfigOption<SubscriptionType> PULSAR_SUBSCRIPTION_TYPE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType")
@@ -238,16 +255,15 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Group a consumer acknowledgment for a specified time (in microseconds).")
-                                    .linebreak()
+                                            "Group a consumer acknowledgment for a specified time (in μs).")
                                     .text(
-                                            "By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.")
-                                    .linebreak()
+                                            " By default, a consumer uses %s grouping time to send out acknowledgments to a broker.",
+                                            code("100μs"))
                                     .text(
-                                            "Setting a group time of 0 sends out acknowledgments immediately.")
-                                    .linebreak()
+                                            " If the group time is set to %s, acknowledgments are sent out immediately.",
+                                            code("0"))
                                     .text(
-                                            "A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.")
+                                            " A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS =
@@ -257,11 +273,11 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Delay (in microseconds) to wait before redelivering messages that failed to be processed.")
+                                            "Delay (in μs) to wait before redelivering messages that failed to be processed.")
                                     .linebreak()
                                     .text(
                                             "When an application uses %s, failed messages are redelivered after a fixed timeout.",
-                                            code("Consumer#negativeAcknowledge(Message)"))
+                                            code("Consumer.negativeAcknowledge(Message)"))
                                     .build());
 
     public static final ConfigOption<Integer>
@@ -274,7 +290,7 @@
                             .withDescription(
                                     Description.builder()
                                             .text(
-                                                    "The max total receiver queue size across partitions.")
+                                                    "The maximum total receiver queue size across partitions.")
                                             .linebreak()
                                             .text(
                                                     "This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.")
@@ -285,7 +301,7 @@
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.");
+                            "The consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.");
 
     public static final ConfigOption<Long> PULSAR_ACK_TIMEOUT_MILLIS =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "ackTimeoutMillis")
@@ -294,17 +310,13 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Set the timeout (in millis) for unacknowledged messages, truncated to the nearest millisecond."
-                                                    + " The timeout needs to be greater than 1 second.")
+                                            "The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.")
                                     .linebreak()
                                     .text(
-                                            "By default, the acknowledge timeout is disabled and that means that messages delivered to a"
-                                                    + " consumer will not be re-delivered unless the consumer crashes.")
+                                            "By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.")
                                     .linebreak()
                                     .text(
-                                            "When enabling ack timeout, if a message is not acknowledged within the specified timeout"
-                                                    + " it will be re-delivered to the consumer"
-                                                    + " (possibly to a different consumer in case of a shared subscription).")
+                                            "When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_TICK_DURATION_MILLIS =
@@ -313,10 +325,10 @@
                     .defaultValue(1000L)
                     .withDescription(
                             Description.builder()
-                                    .text("Granularity (in millis) of the ack-timeout redelivery.")
+                                    .text("Granularity (in ms) of the ack-timeout redelivery.")
                                     .linebreak()
                                     .text(
-                                            "Using an higher %s reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).",
+                                            "A greater (for example, 1 hour) %s reduces the memory overhead to track messages.",
                                             code("tickDurationMillis"))
                                     .build());
 
@@ -327,19 +339,21 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Priority level for a consumer to which a broker gives more priority while dispatching messages in the shared subscription mode.")
+                                            "Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.")
                                     .linebreak()
                                     .text(
                                             "The broker follows descending priorities. For example, 0=max-priority, 1, 2,...")
                                     .linebreak()
                                     .text(
-                                            "In shared subscription mode, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.")
+                                            "In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits.")
+                                    .text(
+                                            " Otherwise, the broker considers consumers on the next priority level.")
                                     .linebreak()
                                     .linebreak()
                                     .text("Example 1")
                                     .linebreak()
                                     .text(
-                                            "If a subscription has consumerA with %s 0 and consumerB with %s 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.",
+                                            "If a subscription has consumer A with %s 0 and consumer B with %s 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.",
                                             code("priorityLevel"), code("priorityLevel"))
                                     .linebreak()
                                     .text("Example 2")
@@ -353,7 +367,7 @@
                                                     + "C5, 1, 1\n")
                                     .linebreak()
                                     .text(
-                                            "Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.")
+                                            "The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.")
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_MAX_PENDING_CHUNKED_MESSAGE =
@@ -363,32 +377,28 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Consumer buffers chunk messages into memory until it receives all the chunks of the original message.")
+                                            "The consumer buffers chunk messages into memory until it receives all the chunks of the original message.")
                                     .text(
-                                            "While consuming chunk-messages, chunks from same message might not be contiguous"
-                                                    + " in the stream and they might be mixed with other messages' chunks.")
+                                            " While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages' chunks.")
                                     .text(
-                                            "So, consumer has to maintain multiple buffers to manage chunks coming from different messages.")
+                                            " So, consumer has to maintain multiple buffers to manage chunks coming from different messages.")
                                     .text(
-                                            "This mainly happens when multiple publishers are publishing messages on the topic"
-                                                    + " concurrently or publisher failed to publish all chunks of the messages.")
-                                    .linebreak()
-                                    .text("eg: M1-C1, M2-C1, M1-C2, M2-C2")
-                                    .text(
-                                            "Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.")
+                                            " This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.")
                                     .linebreak()
                                     .text(
-                                            "Buffering large number of outstanding uncompleted chunked messages can create memory"
-                                                    + " pressure and it can be guarded by providing this %s threshold."
-                                                    + " Once, consumer reaches this threshold, it drops the outstanding unchunked-messages"
-                                                    + " by silently acking or asking broker to redeliver later by marking it unacked."
-                                                    + " This behavior can be controlled by configuration %s",
+                                            "For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.")
+                                    .text(
+                                            "Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.")
+                                    .linebreak()
+                                    .text(
+                                            "Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this %s threshold.",
+                                            code("pulsar.consumer.maxPendingChunkedMessage"))
+                                    .text(
+                                            " Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged.")
+                                    .text(
+                                            " This behavior can be controlled by the %s option.",
                                             code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + "maxPendingChunkedMessage"),
-                                            code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + "autoAckOldestChunkedMessageOnQueueFull"))
+                                                    "pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull"))
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
@@ -398,16 +408,12 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Buffering large number of outstanding uncompleted chunked messages can create memory pressure"
-                                                    + " and it can be guarded by providing this %s threshold."
-                                                    + " Once, consumer reaches this threshold, it drops the outstanding unchunked-messages"
-                                                    + " by silently acking if %s is true else it marks them for redelivery.",
+                                            "Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this %s threshold.",
+                                            code("pulsar.consumer.maxPendingChunkedMessage"))
+                                    .text(
+                                            " Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if %s is true. Otherwise, it marks them for redelivery.",
                                             code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + "maxPendingChunkedMessage"),
-                                            code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + "autoAckOldestChunkedMessageOnQueueFull"))
+                                                    "pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull"))
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS =
@@ -415,10 +421,12 @@
                     .longType()
                     .defaultValue(60 * 1000L)
                     .withDescription(
-                            "If producer fails to publish all the chunks of a message then consumer"
-                                    + " can expire incomplete chunks if consumer won't be able to"
-                                    + " receive all chunks in expire times (default 1 hour)."
-                                    + " It uses milliseconds as the unit of time.");
+                            Description.builder()
+                                    .text(
+                                            "If a producer fails to publish all the chunks of a message,")
+                                    .text(
+                                            " the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).")
+                                    .build());
 
     public static final ConfigOption<ConsumerCryptoFailureAction> PULSAR_CRYPTO_FAILURE_ACTION =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "cryptoFailureAction")
@@ -427,23 +435,28 @@
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Consumer should take action when it receives a message that can not be decrypted.")
+                                            "The consumer should take action when it receives a message that can not be decrypted.")
                                     .list(
                                             text(
-                                                    "FAIL: this is the default option to fail messages until crypto succeeds."),
+                                                    "%s: this is the default option to fail messages until crypto succeeds.",
+                                                    code("FAIL")),
                                             text(
-                                                    "DISCARD: silently acknowledge and not deliver message to an application."),
+                                                    "%s: silently acknowledge but do not deliver messages to an application.",
+                                                    code("DISCARD")),
                                             text(
-                                                    "CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message."))
+                                                    "%s: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.",
+                                                    code("CONSUME")))
                                     .linebreak()
-                                    .text("The decompression of message fails.")
+                                    .text("Fail to decompress the messages.")
                                     .linebreak()
                                     .text(
                                             "If messages contain batch messages, a client is not be able to retrieve individual messages in batch.")
                                     .linebreak()
                                     .text(
-                                            "Delivered encrypted message contains %s which contains encryption and compression information in it using which application can decrypt consumed message payload.",
+                                            "The delivered encrypted message contains %s which contains encryption and compression information in.",
                                             code("EncryptionContext"))
+                                    .text(
+                                            " You can use an application to decrypt the consumed message payload.")
                                     .build());
 
     public static final ConfigOption<Map<String, String>> PULSAR_CONSUMER_PROPERTIES =
@@ -453,17 +466,15 @@
                     .withDescription(
                             Description.builder()
                                     .text("A name or value property of this consumer.")
-                                    .linebreak()
                                     .text(
-                                            "%s is application defined metadata attached to a consumer.",
+                                            " %s is application defined metadata attached to a consumer.",
                                             code("properties"))
-                                    .linebreak()
                                     .text(
-                                            "When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
+                                            " When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_READ_COMPACTED =
-            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted") // NOSONAR
+            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
@@ -524,47 +535,31 @@
                     .intType()
                     .defaultValue(0)
                     .withDescription(
-                            "Maximum number of times that a message will be redelivered before being sent to the dead letter queue.");
+                            "The maximum number of times that a message are redelivered before being sent to the dead letter queue.");
 
     public static final ConfigOption<String> PULSAR_RETRY_LETTER_TOPIC =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "deadLetterPolicy.retryLetterTopic")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription(
-                            "Name of the retry topic where the failing messages will be sent.");
+                    .withDescription("Name of the retry topic where the failed messages are sent.");
     public static final ConfigOption<String> PULSAR_DEAD_LETTER_TOPIC =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "deadLetterPolicy.deadLetterTopic")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription(
-                            "Name of the dead topic where the failing messages will be sent.");
+                    .withDescription("Name of the dead topic where the failed messages are sent.");
 
     public static final ConfigOption<Boolean> PULSAR_RETRY_ENABLE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "retryEnable")
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription("If enabled, the consumer will auto retry messages.");
-
-    public static final ConfigOption<Boolean> PULSAR_AUTO_UPDATE_PARTITIONS =
-            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitions")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "If %s is enabled, a consumer subscribes to partition increase automatically.",
-                                            code("autoUpdatePartitions"))
-                                    .linebreak()
-                                    .text("Note: this is only for partitioned consumers.\t")
-                                    .build());
+                    .withDescription("If enabled, the consumer will automatically retry messages.");
 
     public static final ConfigOption<Integer> PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitionsIntervalSeconds")
                     .intType()
                     .defaultValue(60)
                     .withDescription(
-                            "Set the interval (in seconds) of updating partitions."
-                                    + " This only works if autoUpdatePartitions is enabled.");
+                            "The interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled.");
 
     public static final ConfigOption<Boolean> PULSAR_REPLICATE_SUBSCRIPTION_STATE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "replicateSubscriptionState")
@@ -582,23 +577,11 @@
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Ack will return receipt but does not mean that the message will not be resent after get receipt.");
+                            "Acknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.");
 
     public static final ConfigOption<Boolean> PULSAR_POOL_MESSAGES =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "poolMessages")
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Enable pooling of messages and the underlying data buffers.")
-                                    .linebreak()
-                                    .text(
-                                            "When pooling is enabled, the application is responsible for calling"
-                                                    + " %s after the handling of every received message. If %s"
-                                                    + " is not called on a received message, there will be a memory leak."
-                                                    + " If an application attempts to use and already \"released\" message,"
-                                                    + " it might experience undefined behavior (eg: memory corruption, deserialization error, etc.).",
-                                            code("Message.release()"), code("release()"))
-                                    .build());
+                    .withDescription("Enable pooling of messages and the underlying data buffers.");
 }