Centralize the TLS related configurations
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index a066652..e768c7f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -32,10 +32,11 @@
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.common.SslMode;
+import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.NettySystemConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -43,6 +44,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
+
 public class BrokerStartup {
     public static Properties properties = null;
     public static CommandLine commandLine = null;
@@ -98,7 +101,9 @@
             final BrokerConfig brokerConfig = new BrokerConfig();
             final NettyServerConfig nettyServerConfig = new NettyServerConfig();
             final NettyClientConfig nettyClientConfig = new NettyClientConfig();
-            nettyClientConfig.setUseTLS(NettySystemConfig.sslMode == SslMode.ENFORCING);
+
+            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
+                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
             nettyServerConfig.setListenPort(10911);
             final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 8f255f0..a9eabfe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -19,6 +19,7 @@
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 
 /**
  * Client Common configuration
@@ -45,7 +46,7 @@
     private String unitName;
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
 
-    private boolean useTLS;
+    private boolean useTLS = TlsSystemConfig.tlsEnable;
 
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java
similarity index 87%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java
index cb1e85a..996ef0d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SslMode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java
@@ -25,7 +25,7 @@
  *     <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
  * </ol>
  */
-public enum SslMode {
+public enum TlsMode {
 
     DISABLED("disabled"),
     PERMISSIVE("permissive"),
@@ -33,14 +33,14 @@
 
     private String name;
 
-    SslMode(String name) {
+    TlsMode(String name) {
         this.name = name;
     }
 
-    public static SslMode parse(String mode) {
-        for (SslMode sslMode: SslMode.values()) {
-            if (sslMode.name.equals(mode)) {
-                return sslMode;
+    public static TlsMode parse(String mode) {
+        for (TlsMode tlsMode : TlsMode.values()) {
+            if (tlsMode.name.equals(mode)) {
+                return tlsMode;
             }
         }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index a1008a3..dcc80cb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -131,7 +131,7 @@
 
         if (nettyClientConfig.isUseTLS()) {
             try {
-                sslContext = SslHelper.buildSslContext(true);
+                sslContext = TlsHelper.buildSslContext(true);
                 log.info("SSL enabled for client");
             } catch (IOException e) {
                 log.error("Failed to create SSLContext", e);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 35894a0..cd6ed47 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -54,7 +54,7 @@
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.common.SslMode;
+import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
@@ -139,12 +139,12 @@
             });
         }
 
-        SslMode sslMode = NettySystemConfig.sslMode;
-        log.info("Server is running in TLS {} mode", sslMode.getName());
+        TlsMode tlsMode = TlsSystemConfig.tlsMode;
+        log.info("Server is running in TLS {} mode", tlsMode.getName());
 
-        if (sslMode != SslMode.DISABLED) {
+        if (tlsMode != TlsMode.DISABLED) {
             try {
-                sslContext = SslHelper.buildSslContext(false);
+                sslContext = TlsHelper.buildSslContext(false);
                 log.info("SSLContext created for server");
             } catch (CertificateException e) {
                 log.error("Failed to create SSLContext for server", e);
@@ -189,7 +189,7 @@
                     public void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline()
                             .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
-                                new HandshakeHandler(NettySystemConfig.sslMode))
+                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                             .addLast(defaultEventExecutorGroup,
                                 new NettyEncoder(),
                                 new NettyDecoder(),
@@ -326,12 +326,12 @@
 
     class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
-        private final SslMode sslMode;
+        private final TlsMode tlsMode;
 
         private static final byte HANDSHAKE_MAGIC_CODE = 0x16;
 
-        HandshakeHandler(SslMode sslMode) {
-            this.sslMode = sslMode;
+        HandshakeHandler(TlsMode tlsMode) {
+            this.tlsMode = tlsMode;
         }
 
         @Override
@@ -344,7 +344,7 @@
             byte b = msg.getByte(0);
 
             if (b == HANDSHAKE_MAGIC_CODE) {
-                switch (sslMode) {
+                switch (tlsMode) {
                     case DISABLED:
                         ctx.close();
                         log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
@@ -366,7 +366,7 @@
                         log.warn("Unknown TLS mode");
                         break;
                 }
-            } else if (sslMode == SslMode.ENFORCING) {
+            } else if (tlsMode == TlsMode.ENFORCING) {
                 ctx.close();
                 log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
             }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index b9c1f3f..6357c03 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.remoting.netty;
 
-import org.apache.rocketmq.remoting.common.SslMode;
-
 public class NettySystemConfig {
     public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
         "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
@@ -31,12 +29,6 @@
     public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
         "com.rocketmq.remoting.clientOnewaySemaphoreValue";
 
-    public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE = //
-        "org.apache.rocketmq.remoting.ssl.mode";
-
-    public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = //
-        "org.apache.rocketmq.remoting.ssl.config.file";
-
     public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
         Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
     public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
@@ -47,18 +39,4 @@
         Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
     public static int socketRcvbufSize =
         Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
-
-    /**
-     * For server, three SSL modes are supported: disabled, permissive and enforcing.
-     * <ol>
-     *     <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
-     *     <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
-     *     <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
-     * </ol>
-     */
-    public static SslMode sslMode = //
-        SslMode.parse(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_MODE, "permissive"));
-
-    public static String sslConfigFile = //
-        System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties");
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
deleted file mode 100644
index 4bf3b52..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.netty;
-
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.OpenSsl;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.cert.CertificateException;
-import java.util.Properties;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SslHelper {
-
-    public interface DecryptionStrategy {
-        /**
-         * Decrypt the target encrpted private key file.
-         *
-         * @param privateKeyEncryptPath A pathname string
-         * @param forClient tells whether it's a client-side key file
-         * @return An input stream for a decrypted key file
-         * @throws IOException if an I/O error has occurred
-         */
-        InputStream decryptPrivateKey(String privateKeyEncryptPath, boolean forClient) throws IOException;
-    }
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-
-    private static DecryptionStrategy decryptionStrategy = new DecryptionStrategy() {
-        @Override
-        public InputStream decryptPrivateKey(final String privateKeyEncryptPath,
-            final boolean forClient) throws IOException {
-            return new FileInputStream(privateKeyEncryptPath);
-        }
-    };
-
-
-    public static void registerDecryptionStrategy(final DecryptionStrategy decryptionStrategy) {
-        SslHelper.decryptionStrategy = decryptionStrategy;
-    }
-
-    public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
-
-        File configFile = new File(NettySystemConfig.sslConfigFile);
-        boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead());
-        Properties properties = null;
-
-        if (!testMode) {
-            properties = new Properties();
-            InputStream inputStream = null;
-            try {
-                inputStream = new FileInputStream(configFile);
-                properties.load(inputStream);
-            } catch (FileNotFoundException ignore) {
-            } catch (IOException ignore) {
-            } finally {
-                if (null != inputStream) {
-                    try {
-                        inputStream.close();
-                    } catch (IOException ignore) {
-                    }
-                }
-            }
-        }
-
-        SslProvider provider = null;
-        if (OpenSsl.isAvailable()) {
-            provider = SslProvider.OPENSSL;
-            LOGGER.info("Using OpenSSL provider");
-        } else {
-            provider = SslProvider.JDK;
-            LOGGER.info("Using JDK SSL provider");
-        }
-
-        if (forClient) {
-            if (testMode) {
-                return SslContextBuilder
-                    .forClient()
-                    .sslProvider(SslProvider.JDK)
-                    .trustManager(InsecureTrustManagerFactory.INSTANCE)
-                    .build();
-            } else {
-                SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
-
-                if ("false".equals(properties.getProperty("client.auth.server"))) {
-                    sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
-                } else {
-                    if (properties.containsKey("client.trustManager")) {
-                        sslContextBuilder.trustManager(new File(properties.getProperty("client.trustManager")));
-                    }
-                }
-
-                return sslContextBuilder.keyManager(
-                    properties.containsKey("client.keyCertChainFile") ? new FileInputStream(properties.getProperty("client.keyCertChainFile")) : null,
-                    properties.containsKey("client.keyFile") ? decryptionStrategy.decryptPrivateKey(properties.getProperty("client.keyFile"), true) : null,
-                    properties.containsKey("client.password") ? properties.getProperty("client.password") : null)
-                    .build();
-            }
-        } else {
-
-            if (testMode) {
-                SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
-                return SslContextBuilder
-                    .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
-                    .sslProvider(SslProvider.JDK)
-                    .clientAuth(ClientAuth.OPTIONAL)
-                    .build();
-            } else {
-                return SslContextBuilder.forServer(
-                    properties.containsKey("server.keyCertChainFile") ? new FileInputStream(properties.getProperty("server.keyCertChainFile")) : null,
-                    properties.containsKey("server.keyFile") ? decryptionStrategy.decryptPrivateKey(properties.getProperty("server.keyFile"), false) : null,
-                    properties.containsKey("server.password") ? properties.getProperty("server.password") : null)
-                    .sslProvider(provider)
-                    .trustManager(new File(properties.getProperty("server.trustManager")))
-                    .clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client")))
-                    .build();
-            }
-        }
-    }
-
-    private static ClientAuth parseClientAuthMode(String authMode) {
-        if (null == authMode || authMode.trim().isEmpty()) {
-            return ClientAuth.NONE;
-        }
-
-        if ("optional".equalsIgnoreCase(authMode)) {
-            return ClientAuth.OPTIONAL;
-        }
-
-        return ClientAuth.REQUIRE;
-    }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java
new file mode 100644
index 0000000..afcf118
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.cert.CertificateException;
+import java.util.Properties;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_AUTHSERVER;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_CERTPATH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPASSWORD;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_KEYPATH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_CLIENT_TRUSTCERTPATH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_SERVER_AUTHCLIENT;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_SERVER_CERTPATH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_SERVER_KEYPASSWORD;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_SERVER_KEYPATH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_SERVER_NEED_CLIENT_AUTH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_SERVER_TRUSTCERTPATH;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_TEST_MODE_ENABLE;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsClientAuthServer;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsClientCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsClientKeyPassword;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsClientKeyPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsClientTrustCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerAuthClient;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPassword;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerNeedClientAuth;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerTrustCertPath;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsTestModeEnable;
+
+public class TlsHelper {
+
+    public interface DecryptionStrategy {
+        /**
+         * Decrypt the target encrpted private key file.
+         *
+         * @param privateKeyEncryptPath A pathname string
+         * @param forClient tells whether it's a client-side key file
+         * @return An input stream for a decrypted key file
+         * @throws IOException if an I/O error has occurred
+         */
+        InputStream decryptPrivateKey(String privateKeyEncryptPath, boolean forClient) throws IOException;
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private static DecryptionStrategy decryptionStrategy = new DecryptionStrategy() {
+        @Override
+        public InputStream decryptPrivateKey(final String privateKeyEncryptPath,
+            final boolean forClient) throws IOException {
+            return new FileInputStream(privateKeyEncryptPath);
+        }
+    };
+
+
+    public static void registerDecryptionStrategy(final DecryptionStrategy decryptionStrategy) {
+        TlsHelper.decryptionStrategy = decryptionStrategy;
+    }
+
+    public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
+        File configFile = new File(TlsSystemConfig.tlsConfigFile);
+        extractTlsConfigFromFile(configFile);
+        logTheFinalUsedTlsConfig();
+
+        SslProvider provider;
+        if (OpenSsl.isAvailable()) {
+            provider = SslProvider.OPENSSL;
+            LOGGER.info("Using OpenSSL provider");
+        } else {
+            provider = SslProvider.JDK;
+            LOGGER.info("Using JDK SSL provider");
+        }
+
+        if (forClient) {
+            if (tlsTestModeEnable) {
+                return SslContextBuilder
+                    .forClient()
+                    .sslProvider(SslProvider.JDK)
+                    .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                    .build();
+            } else {
+                SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
+
+
+                if (!tlsClientAuthServer) {
+                    sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+                } else {
+                    if (!isNullOrEmpty(tlsClientTrustCertPath)) {
+                        sslContextBuilder.trustManager(new File(tlsClientTrustCertPath));
+                    }
+                }
+
+                return sslContextBuilder.keyManager(
+                    !isNullOrEmpty(tlsClientCertPath) ? new FileInputStream(tlsClientCertPath) : null,
+                    !isNullOrEmpty(tlsClientKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsClientKeyPath, true) : null,
+                    !isNullOrEmpty(tlsClientKeyPassword) ? tlsClientKeyPassword : null)
+                    .build();
+            }
+        } else {
+
+            if (tlsTestModeEnable) {
+                SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
+                return SslContextBuilder
+                    .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
+                    .sslProvider(SslProvider.JDK)
+                    .clientAuth(ClientAuth.OPTIONAL)
+                    .build();
+            } else {
+                SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
+                    !isNullOrEmpty(tlsServerCertPath) ? new FileInputStream(tlsServerCertPath) : null,
+                    !isNullOrEmpty(tlsServerKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsServerKeyPath, false) : null,
+                    !isNullOrEmpty(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
+                    .sslProvider(provider);
+
+                if (!tlsServerAuthClient) {
+                    sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+                } else {
+                    if (!isNullOrEmpty(tlsServerTrustCertPath)) {
+                        sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));
+                    }
+                }
+
+                sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));
+                return sslContextBuilder.build();
+            }
+        }
+    }
+
+    private static void extractTlsConfigFromFile(final File configFile) {
+        if (!(configFile.exists() && configFile.isFile() && configFile.canRead())) {
+            LOGGER.info("Tls config file doesn't exist, skip it");
+        }
+
+        Properties properties;
+        properties = new Properties();
+        InputStream inputStream = null;
+        try {
+            inputStream = new FileInputStream(configFile);
+            properties.load(inputStream);
+        } catch (IOException ignore) {
+        } finally {
+            if (null != inputStream) {
+                try {
+                    inputStream.close();
+                } catch (IOException ignore) {
+                }
+            }
+        }
+
+        tlsTestModeEnable = Boolean.parseBoolean(properties.getProperty(TLS_TEST_MODE_ENABLE, String.valueOf(tlsTestModeEnable)));
+        tlsServerNeedClientAuth = properties.getProperty(TLS_SERVER_NEED_CLIENT_AUTH, tlsServerNeedClientAuth);
+        tlsServerKeyPath = properties.getProperty(TLS_SERVER_KEYPATH, tlsServerKeyPath);
+        tlsServerKeyPassword = properties.getProperty(TLS_SERVER_KEYPASSWORD, tlsServerKeyPassword);
+        tlsServerCertPath = properties.getProperty(TLS_SERVER_CERTPATH, tlsServerCertPath);
+        tlsServerAuthClient = Boolean.parseBoolean(properties.getProperty(TLS_SERVER_AUTHCLIENT, String.valueOf(tlsServerAuthClient)));
+        tlsServerTrustCertPath = properties.getProperty(TLS_SERVER_TRUSTCERTPATH, tlsServerTrustCertPath);
+
+        tlsClientKeyPath = properties.getProperty(TLS_CLIENT_KEYPATH, tlsClientKeyPath);
+        tlsClientKeyPassword = properties.getProperty(TLS_CLIENT_KEYPASSWORD, tlsClientKeyPassword);
+        tlsClientCertPath = properties.getProperty(TLS_CLIENT_CERTPATH, tlsClientCertPath);
+        tlsClientAuthServer = Boolean.parseBoolean(properties.getProperty(TLS_CLIENT_AUTHSERVER, String.valueOf(tlsClientAuthServer)));
+        tlsClientTrustCertPath = properties.getProperty(TLS_CLIENT_TRUSTCERTPATH, tlsClientTrustCertPath);
+    }
+
+    private static void logTheFinalUsedTlsConfig() {
+        LOGGER.info("Log the final used tls related configuration");
+        LOGGER.info("{} = {}", TLS_TEST_MODE_ENABLE, tlsTestModeEnable);
+        LOGGER.info("{} = {}", TLS_SERVER_NEED_CLIENT_AUTH, tlsServerNeedClientAuth);
+        LOGGER.info("{} = {}", TLS_SERVER_KEYPATH, tlsServerKeyPath);
+        LOGGER.info("{} = {}", TLS_SERVER_KEYPASSWORD, tlsServerKeyPassword);
+        LOGGER.info("{} = {}", TLS_SERVER_CERTPATH, tlsServerCertPath);
+        LOGGER.info("{} = {}", TLS_SERVER_AUTHCLIENT, tlsServerAuthClient);
+        LOGGER.info("{} = {}", TLS_SERVER_TRUSTCERTPATH, tlsServerTrustCertPath);
+
+        LOGGER.info("{} = {}", TLS_CLIENT_KEYPATH, tlsClientKeyPath);
+        LOGGER.info("{} = {}", TLS_CLIENT_KEYPASSWORD, tlsClientKeyPassword);
+        LOGGER.info("{} = {}", TLS_CLIENT_CERTPATH, tlsClientCertPath);
+        LOGGER.info("{} = {}", TLS_CLIENT_AUTHSERVER, tlsClientAuthServer);
+        LOGGER.info("{} = {}", TLS_CLIENT_TRUSTCERTPATH, tlsClientTrustCertPath);
+    }
+
+    private static ClientAuth parseClientAuthMode(String authMode) {
+        if (null == authMode || authMode.trim().isEmpty()) {
+            return ClientAuth.NONE;
+        }
+
+        for (ClientAuth clientAuth : ClientAuth.values()) {
+            if (clientAuth.name().equals(authMode.toUpperCase())) {
+                return clientAuth;
+            }
+        }
+
+        return ClientAuth.NONE;
+    }
+
+    /**
+     * Determine if a string is {@code null} or {@link String#isEmpty()} returns {@code true}.
+     */
+    private static boolean isNullOrEmpty(String s) {
+        return s == null || s.isEmpty();
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java
new file mode 100644
index 0000000..f681188
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.handler.ssl.SslContext;
+import org.apache.rocketmq.remoting.common.TlsMode;
+
+public class TlsSystemConfig {
+    public static final String TLS_SERVER_MODE = "tls.server.mode";
+    public static final String TLS_ENABLE = "tls.enable";
+    public static final String TLS_CONFIG_FILE = "tls.config.file";
+    public static final String TLS_TEST_MODE_ENABLE = "tls.test.mode.enable";
+
+    public static final String TLS_SERVER_NEED_CLIENT_AUTH = "tls.server.need.client.auth";
+    public static final String TLS_SERVER_KEYPATH = "tls.server.keyPath";
+    public static final String TLS_SERVER_KEYPASSWORD = "tls.server.keyPassword";
+    public static final String TLS_SERVER_CERTPATH = "tls.server.certPath";
+    public static final String TLS_SERVER_AUTHCLIENT = "tls.server.authClient";
+    public static final String TLS_SERVER_TRUSTCERTPATH = "tls.server.trustCertPath";
+
+    public static final String TLS_CLIENT_KEYPATH = "tls.client.keyPath";
+    public static final String TLS_CLIENT_KEYPASSWORD = "tls.client.keyPassword";
+    public static final String TLS_CLIENT_CERTPATH = "tls.client.certPath";
+    public static final String TLS_CLIENT_AUTHSERVER = "tls.client.authServer";
+    public static final String TLS_CLIENT_TRUSTCERTPATH = "tls.client.trustCertPath";
+
+
+    /**
+     * To determine whether use SSL in client-side, include SDK client and BrokerOuterAPI
+     */
+    public static boolean tlsEnable = Boolean.parseBoolean(System.getProperty(TLS_ENABLE, "false"));
+
+    /**
+     * To determine whether use test mode when initialize TLS context
+     */
+    public static boolean tlsTestModeEnable = Boolean.parseBoolean(System.getProperty(TLS_TEST_MODE_ENABLE, "false"));
+
+    /**
+     * Indicates the state of the {@link javax.net.ssl.SSLEngine} with respect to client authentication.
+     * This configuration item really only applies when building the server-side {@link SslContext},
+     * and can be set to none, require or optional.
+     */
+    public static String tlsServerNeedClientAuth = System.getProperty(TLS_SERVER_NEED_CLIENT_AUTH, "none");
+    /**
+     * The store path of server-side private key
+     */
+    public static String tlsServerKeyPath = System.getProperty(TLS_SERVER_KEYPATH, null);
+
+    /**
+     * The  password of the server-side private key
+     */
+    public static String tlsServerKeyPassword = System.getProperty(TLS_SERVER_KEYPASSWORD, null);
+
+    /**
+     * The store path of server-side X.509 certificate chain in PEM format
+     */
+    public static String tlsServerCertPath = System.getProperty(TLS_SERVER_CERTPATH, null);
+
+    /**
+     * To determine whether verify the client endpoint's certificate strictly
+     */
+    public static boolean tlsServerAuthClient = Boolean.parseBoolean(System.getProperty(TLS_SERVER_AUTHCLIENT, "false"));
+
+    /**
+     * The store path of trusted certificates for verifying the client endpoint's certificate
+     */
+    public static String tlsServerTrustCertPath = System.getProperty(TLS_SERVER_TRUSTCERTPATH, null);
+
+    /**
+     * The store path of client-side private key
+     */
+    public static String tlsClientKeyPath = System.getProperty(TLS_CLIENT_KEYPATH, null);
+
+    /**
+     * The  password of the client-side private key
+     */
+    public static String tlsClientKeyPassword = System.getProperty(TLS_CLIENT_KEYPASSWORD, null);
+
+    /**
+     * The store path of client-side X.509 certificate chain in PEM format
+     */
+    public static String tlsClientCertPath = System.getProperty(TLS_CLIENT_CERTPATH, null);
+
+    /**
+     * To determine whether verify the server endpoint's certificate strictly
+     */
+    public static boolean tlsClientAuthServer = Boolean.parseBoolean(System.getProperty(TLS_CLIENT_AUTHSERVER, "false"));
+
+    /**
+     * The store path of trusted certificates for verifying the server endpoint's certificate
+     */
+    public static String tlsClientTrustCertPath = System.getProperty(TLS_CLIENT_TRUSTCERTPATH, null);
+
+    /**
+     * For server, three SSL modes are supported: disabled, permissive and enforcing.
+     * For client, use {@link TlsSystemConfig#tlsEnable} to determine whether use SSL.
+     * <ol>
+     *     <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
+     *     <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
+     *     <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
+     * </ol>
+     */
+    public static TlsMode tlsMode = TlsMode.parse(System.getProperty(TLS_SERVER_MODE, "permissive"));
+
+    /**
+     * A config file to store the above TLS related configurations,
+     * except {@link TlsSystemConfig#tlsMode} and {@link TlsSystemConfig#tlsEnable}
+     */
+    public static String tlsConfigFile = System.getProperty(TLS_CONFIG_FILE, "/etc/rocketmq/tls.properties");
+}