OMID-239 OMID TLS support (#129)

diff --git a/common/pom.xml b/common/pom.xml
index 86c8a26..c504049 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -104,6 +104,41 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons-io.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!--end log4j2 test dependencies -->
 
     </dependencies>
@@ -160,6 +195,18 @@
                     <skip>true</skip>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                        <phase>prepare-package</phase>
+                    </execution>
+                </executions>
+            </plugin>
 
         </plugins>
     </build>
diff --git a/common/src/main/java/org/apache/omid/tls/KeyStoreFileType.java b/common/src/main/java/org/apache/omid/tls/KeyStoreFileType.java
new file mode 100644
index 0000000..14f7d60
--- /dev/null
+++ b/common/src/main/java/org/apache/omid/tls/KeyStoreFileType.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tls;
+
+/**
+ * This enum represents the file type of a KeyStore or TrustStore.
+ * Currently, JKS (Java keystore), PEM, PKCS12, and BCFKS types are supported.
+ */
+public enum KeyStoreFileType {
+    JKS(".jks"),
+    PEM(".pem"),
+    PKCS12(".p12"),
+    BCFKS(".bcfks");
+
+    private final String defaultFileExtension;
+
+    KeyStoreFileType(String defaultFileExtension) {
+        this.defaultFileExtension = defaultFileExtension;
+    }
+
+    /**
+     * The property string that specifies that a key store or trust store
+     * should use this store file type.
+     */
+    public String getPropertyValue() {
+        return this.name();
+    }
+
+    /**
+     * The file extension that is associated with this file type.
+     */
+    public String getDefaultFileExtension() {
+        return defaultFileExtension;
+    }
+
+    /**
+     * Converts a property value to a StoreFileType enum. If the property value
+     * is <code>null</code> or an empty string, returns <code>null</code>.
+     * @param propertyValue the property value.
+     * @return the KeyStoreFileType, or <code>null</code> if
+     *         <code>propertyValue</code> is <code>null</code> or empty.
+     * @throws IllegalArgumentException if <code>propertyValue</code> is not
+     *         one of "JKS", "PEM", "BCFKS", "PKCS12", or empty/null.
+     */
+    public static KeyStoreFileType fromPropertyValue(String propertyValue) {
+        if (propertyValue == null || propertyValue.length() == 0) {
+            return null;
+        }
+        return KeyStoreFileType.valueOf(propertyValue.toUpperCase());
+    }
+
+    /**
+     * Detects the type of KeyStore / TrustStore file from the file extension.
+     * If the file name ends with ".jks", returns <code>StoreFileType.JKS</code>.
+     * If the file name ends with ".pem", returns <code>StoreFileType.PEM</code>.
+     * If the file name ends with ".p12", returns <code>StoreFileType.PKCS12</code>.
+     * If the file name ends with ".bckfs", returns <code>StoreFileType.BCKFS</code>.
+     * Otherwise, throws an IllegalArgumentException.
+     * @param filename the filename of the key store or trust store file.
+     * @return a KeyStoreFileType.
+     * @throws IllegalArgumentException if the filename does not end with
+     *         ".jks", ".pem", "p12" or "bcfks".
+     */
+    public static KeyStoreFileType fromFilename(String filename) {
+        int i = filename.lastIndexOf('.');
+        if (i >= 0) {
+            String extension = filename.substring(i);
+            for (KeyStoreFileType storeFileType : KeyStoreFileType.values()) {
+                if (storeFileType.getDefaultFileExtension().equals(extension)) {
+                    return storeFileType;
+                }
+            }
+        }
+        throw new IllegalArgumentException("Unable to auto-detect store file type from file name: " + filename);
+    }
+
+    /**
+     * If <code>propertyValue</code> is not null or empty, returns the result
+     * of <code>KeyStoreFileType.fromPropertyValue(propertyValue)</code>. Else,
+     * returns the result of <code>KeyStoreFileType.fromFileName(filename)</code>.
+     * @param propertyValue property value describing the KeyStoreFileType, or
+     *                      null/empty to auto-detect the type from the file
+     *                      name.
+     * @param filename file name of the key store file. The file extension is
+     *                 used to auto-detect the KeyStoreFileType when
+     *                 <code>propertyValue</code> is null or empty.
+     * @return a KeyStoreFileType.
+     * @throws IllegalArgumentException if <code>propertyValue</code> is not
+     *         one of "JKS", "PEM", "PKCS12", "BCFKS", or empty/null.
+     * @throws IllegalArgumentException if <code>propertyValue</code>is empty
+     *         or null and the type could not be determined from the file name.
+     */
+    public static KeyStoreFileType fromPropertyValueOrFileName(String propertyValue, String filename) {
+        KeyStoreFileType result = KeyStoreFileType.fromPropertyValue(propertyValue);
+        if (result == null) {
+            result = KeyStoreFileType.fromFilename(filename);
+        }
+        return result;
+    }
+}
diff --git a/common/src/main/java/org/apache/omid/tls/X509Exception.java b/common/src/main/java/org/apache/omid/tls/X509Exception.java
new file mode 100644
index 0000000..e4add3f
--- /dev/null
+++ b/common/src/main/java/org/apache/omid/tls/X509Exception.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tls;
+
+@SuppressWarnings("serial")
+public class X509Exception extends Exception {
+
+    public X509Exception(String message) {
+        super(message);
+    }
+
+    public X509Exception(Throwable cause) {
+        super(cause);
+    }
+
+    public X509Exception(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public static class KeyManagerException extends X509Exception {
+
+        public KeyManagerException(String message) {
+            super(message);
+        }
+
+        public KeyManagerException(Throwable cause) {
+            super(cause);
+        }
+
+    }
+
+    public static class TrustManagerException extends X509Exception {
+
+        public TrustManagerException(String message) {
+            super(message);
+        }
+
+        public TrustManagerException(Throwable cause) {
+            super(cause);
+        }
+
+    }
+
+    public static class SSLContextException extends X509Exception {
+
+        public SSLContextException(String message) {
+            super(message);
+        }
+
+        public SSLContextException(Throwable cause) {
+            super(cause);
+        }
+
+        public SSLContextException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/omid/tls/X509Util.java b/common/src/main/java/org/apache/omid/tls/X509Util.java
new file mode 100644
index 0000000..dffc209
--- /dev/null
+++ b/common/src/main/java/org/apache/omid/tls/X509Util.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tls;
+
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import org.apache.omid.tls.X509Exception;
+import org.apache.omid.tls.X509Exception.KeyManagerException;
+import org.apache.omid.tls.X509Exception.SSLContextException;
+import org.apache.omid.tls.X509Exception.TrustManagerException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ObjectArrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.PKIXBuilderParameters;
+import java.security.cert.X509CertSelector;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * Utility code for X509 handling Default cipher suites: Performance testing done by Facebook
+ * engineers shows that on Intel x86_64 machines, Java9 performs better with GCM and Java8 performs
+ * better with CBC, so these seem like reasonable defaults.
+ * <p/>
+ * This file has is based on the one in HBase project.
+ * @see <a href=
+ *      "https://github.com/apache/hbase/blob/d2b0074f7ad4c43d31a1a511a0d74feda72451d1/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java">Base
+ *      revision</a>
+ */
+public final class X509Util {
+
+    private static final Logger LOG = LoggerFactory.getLogger(X509Util.class);
+    private static final char[] EMPTY_CHAR_ARRAY = new char[0];
+
+    // Config
+    public static final int DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS = 5000;
+
+    public static final String DEFAULT_PROTOCOL = "TLSv1.2";
+
+    private static String[] getGCMCiphers() {
+        return new String[] { "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
+                "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
+                "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" };
+    }
+
+    private static String[] getCBCCiphers() {
+        return new String[] { "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
+                "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
+                "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384",
+                "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
+                "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA" };
+    }
+
+    // On Java 8, prefer CBC ciphers since AES-NI support is lacking and GCM is slower than CBC.
+    private static final String[] DEFAULT_CIPHERS_JAVA8 =
+            ObjectArrays.concat(getCBCCiphers(), getGCMCiphers(), String.class);
+    // On Java 9 and later, prefer GCM ciphers due to improved AES-NI support.
+    // Note that this performance assumption might not hold true for architectures other than x86_64.
+    private static final String[] DEFAULT_CIPHERS_JAVA9 =
+            ObjectArrays.concat(getGCMCiphers(), getCBCCiphers(), String.class);
+
+    private X509Util() {
+        // disabled
+    }
+
+    static String[] getDefaultCipherSuites() {
+        return getDefaultCipherSuitesForJavaVersion(System.getProperty("java.specification.version"));
+    }
+
+    static String[] getDefaultCipherSuitesForJavaVersion(String javaVersion) {
+        Objects.requireNonNull(javaVersion);
+        if (javaVersion.matches("\\d+")) {
+            // Must be Java 9 or later
+            LOG.debug("Using Java9+ optimized cipher suites for Java version {}", javaVersion);
+            return DEFAULT_CIPHERS_JAVA9;
+        } else if (javaVersion.startsWith("1.")) {
+            // Must be Java 1.8 or earlier
+            LOG.debug("Using Java8 optimized cipher suites for Java version {}", javaVersion);
+            return DEFAULT_CIPHERS_JAVA8;
+        } else {
+            LOG.debug("Could not parse java version {}, using Java8 optimized cipher suites",
+                    javaVersion);
+            return DEFAULT_CIPHERS_JAVA8;
+        }
+    }
+
+    public static SslContext createSslContextForClient(String keyStoreLocation, char[] keyStorePassword,
+                                                       String keyStoreType, String trustStoreLocation, char[] trustStorePassword, String trustStoreType,
+                                                       boolean sslCrlEnabled, boolean sslOcspEnabled, String enabledProtocols, String cipherSuites, String tlsConfigProtocols)
+            throws X509Exception, IOException {
+
+        SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+
+        if (keyStoreLocation.isEmpty()) {
+            LOG.warn("keyStoreLocation is not specified");
+        } else {
+            sslContextBuilder
+                    .keyManager(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType));
+        }
+
+        if (trustStoreLocation.isEmpty()) {
+            LOG.warn("trustStoreLocation is not specified");
+        } else {
+            sslContextBuilder.trustManager(createTrustManager(trustStoreLocation, trustStorePassword,
+                    trustStoreType, sslCrlEnabled, sslOcspEnabled));
+        }
+
+        sslContextBuilder.enableOcsp(sslOcspEnabled);
+        sslContextBuilder.protocols(getEnabledProtocols(enabledProtocols, tlsConfigProtocols));
+        sslContextBuilder.ciphers(Arrays.asList(getCipherSuites(cipherSuites)));
+
+        return sslContextBuilder.build();
+    }
+
+    public static SslContext createSslContextForServer(String keyStoreLocation, char[] keyStorePassword,
+      String keyStoreType, String trustStoreLocation, char[] trustStorePassword, String trustStoreType,
+      boolean sslCrlEnabled, boolean sslOcspEnabled, String enabledProtocols, String cipherSuites, String tlsConfigProtocols)
+            throws X509Exception, IOException {
+
+        if (keyStoreLocation.isEmpty()) {
+            throw new SSLContextException(
+                    "keyStoreLocation is required for SSL server: ");
+        }
+
+        SslContextBuilder sslContextBuilder;
+
+        sslContextBuilder = SslContextBuilder
+                .forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType));
+
+        if (trustStoreLocation.isEmpty()) {
+            LOG.warn("trustStoreLocation is not specified");
+        } else {
+            sslContextBuilder.trustManager(createTrustManager(trustStoreLocation, trustStorePassword,
+                    trustStoreType, sslCrlEnabled, sslOcspEnabled));
+        }
+
+        sslContextBuilder.enableOcsp(sslOcspEnabled);
+        sslContextBuilder.protocols(getEnabledProtocols(enabledProtocols, tlsConfigProtocols));
+        sslContextBuilder.ciphers(Arrays.asList(getCipherSuites(cipherSuites)));
+
+        return sslContextBuilder.build();
+    }
+
+    /**
+     * Creates a key manager by loading the key store from the given file of the given type,
+     * optionally decrypting it using the given password.
+     * @param keyStoreLocation the location of the key store file.
+     * @param keyStorePassword optional password to decrypt the key store. If empty, assumes the key
+     *                         store is not encrypted.
+     * @param keyStoreType     must be JKS, PEM, PKCS12, BCFKS or null. If null, attempts to
+     *                         autodetect the key store type from the file extension (e.g. .jks /
+     *                         .pem).
+     * @return the key manager.
+     * @throws KeyManagerException if something goes wrong.
+     */
+    static X509KeyManager createKeyManager(String keyStoreLocation, char[] keyStorePassword,
+                                           String keyStoreType) throws KeyManagerException {
+
+        if (keyStoreType == null) {
+            keyStoreType = "jks";
+        }
+
+        if (keyStorePassword == null) {
+            keyStorePassword = EMPTY_CHAR_ARRAY;
+        }
+
+        try {
+            KeyStore ks = KeyStore.getInstance(keyStoreType);
+            try (InputStream inputStream = Files.newInputStream(new File(keyStoreLocation).toPath())) {
+                ks.load(inputStream, keyStorePassword);
+            }
+
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
+            kmf.init(ks, keyStorePassword);
+
+            for (KeyManager km : kmf.getKeyManagers()) {
+                if (km instanceof X509KeyManager) {
+                    return (X509KeyManager) km;
+                }
+            }
+            throw new KeyManagerException("Couldn't find X509KeyManager");
+        } catch (IOException | GeneralSecurityException | IllegalArgumentException e) {
+            throw new KeyManagerException(e);
+        }
+    }
+
+    /**
+     * Creates a trust manager by loading the trust store from the given file of the given type,
+     * optionally decrypting it using the given password.
+     * @param trustStoreLocation the location of the trust store file.
+     * @param trustStorePassword optional password to decrypt the trust store (only applies to JKS
+     *                           trust stores). If empty, assumes the trust store is not encrypted.
+     * @param trustStoreType     must be JKS, PEM, PKCS12, BCFKS or null. If null, attempts to
+     *                           autodetect the trust store type from the file extension (e.g. .jks /
+     *                           .pem).
+     * @param crlEnabled         enable CRL (certificate revocation list) checks.
+     * @param ocspEnabled        enable OCSP (online certificate status protocol) checks.
+     * @return the trust manager.
+     * @throws TrustManagerException if something goes wrong.
+     */
+    static X509TrustManager createTrustManager(String trustStoreLocation, char[] trustStorePassword,
+                                               String trustStoreType, boolean crlEnabled, boolean ocspEnabled) throws TrustManagerException {
+
+        if (trustStoreType == null) {
+            trustStoreType = "jks";
+        }
+
+        if (trustStorePassword == null) {
+            trustStorePassword = EMPTY_CHAR_ARRAY;
+        }
+
+        try {
+            KeyStore ts = KeyStore.getInstance(trustStoreType);
+            try (InputStream inputStream = Files.newInputStream(new File(trustStoreLocation).toPath())) {
+                ts.load(inputStream, trustStorePassword);
+            }
+
+            PKIXBuilderParameters pbParams = new PKIXBuilderParameters(ts, new X509CertSelector());
+            if (crlEnabled || ocspEnabled) {
+                pbParams.setRevocationEnabled(true);
+                System.setProperty("com.sun.net.ssl.checkRevocation", "true");
+                if (crlEnabled) {
+                    System.setProperty("com.sun.security.enableCRLDP", "true");
+                }
+                if (ocspEnabled) {
+                    Security.setProperty("ocsp.enable", "true");
+                }
+            } else {
+                pbParams.setRevocationEnabled(false);
+            }
+
+            // Revocation checking is only supported with the PKIX algorithm
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+            tmf.init(new CertPathTrustManagerParameters(pbParams));
+
+            for (final TrustManager tm : tmf.getTrustManagers()) {
+                if (tm instanceof X509ExtendedTrustManager) {
+                    return (X509ExtendedTrustManager) tm;
+                }
+            }
+            throw new TrustManagerException("Couldn't find X509TrustManager");
+        } catch (IOException | GeneralSecurityException | IllegalArgumentException e) {
+            throw new TrustManagerException(e);
+        }
+    }
+
+    private static String[] getEnabledProtocols(String enabledProtocolsInput, String tlsConfigProtocols) {
+        if (enabledProtocolsInput == null) {
+            return new String[] {tlsConfigProtocols};
+        }
+        return enabledProtocolsInput.split(",");
+    }
+
+    private static String[] getCipherSuites(String cipherSuitesInput) {
+        if (cipherSuitesInput == null) {
+            return getDefaultCipherSuites();
+        } else {
+            return cipherSuitesInput.split(",");
+        }
+    }
+}
diff --git a/common/src/test/java/org/apache/omid/tls/BaseX509ParameterizedTestCase.java b/common/src/test/java/org/apache/omid/tls/BaseX509ParameterizedTestCase.java
new file mode 100644
index 0000000..15f5037
--- /dev/null
+++ b/common/src/test/java/org/apache/omid/tls/BaseX509ParameterizedTestCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tls;
+
+import org.apache.commons.io.FileUtils;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.security.Security;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Base class for parameterized unit tests that use X509TestContext for testing different X509
+ * parameter combinations (CA key type, cert key type, with/without a password, with/without
+ * hostname verification, etc). This base class takes care of setting up / cleaning up the test
+ * environment, and caching the X509TestContext objects used by the tests.
+ * <p/>
+ * This file has is based on the one in HBase project.
+ * @see <a href=
+ *      "https://github.com/apache/hbase/blob/d2b0074f7ad4c43d31a1a511a0d74feda72451d1/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/AbstractTestX509Parameterized.java">Base
+ *      revision</a>
+ */
+public abstract class BaseX509ParameterizedTestCase {
+    protected static final String KEY_NON_EMPTY_PASSWORD = "pa$$w0rd";
+    protected static final String KEY_EMPTY_PASSWORD = "";
+
+    /**
+     * Because key generation and writing / deleting files is kind of expensive, we cache the certs
+     * and on-disk files between test cases. None of the test cases modify any of this data so it's
+     * safe to reuse between tests. This caching makes all test cases after the first one for a given
+     * parameter combination complete almost instantly.
+     */
+    protected static Map<Integer, X509TestContext> cachedTestContexts;
+    protected static File tempDir;
+
+    protected X509TestContext x509TestContext;
+
+    @BeforeClass
+    public static void setUpBaseClass() throws Exception {
+        Security.addProvider(new BouncyCastleProvider());
+        cachedTestContexts = new HashMap<>();
+        tempDir = Files.createTempDirectory("x509Tests").toFile();
+    }
+
+    @AfterClass
+    public static void cleanUpBaseClass() {
+        Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+        cachedTestContexts.clear();
+        cachedTestContexts = null;
+        try {
+            FileUtils.deleteDirectory(tempDir);
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+    /**
+     * Init method. See example usage in {@link TestX509Util}.
+     * @param paramIndex      the index under which the X509TestContext should be cached.
+     * @param contextSupplier a function that creates and returns the X509TestContext for the current
+     *                        index if one is not already cached.
+     */
+    protected void init(Integer paramIndex, Supplier<X509TestContext> contextSupplier) {
+        if (cachedTestContexts.containsKey(paramIndex)) {
+            x509TestContext = cachedTestContexts.get(paramIndex);
+        } else {
+            x509TestContext = contextSupplier.get();
+            cachedTestContexts.put(paramIndex, x509TestContext);
+        }
+    }
+
+    protected void init(final X509KeyType caKeyType, final X509KeyType certKeyType,
+                        final String keyPassword, final Integer paramIndex) throws Exception {
+        init(paramIndex, () -> {
+            try {
+                return X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                        .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                        .setTrustStoreKeyType(caKeyType).build();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+}
+
diff --git a/common/src/test/java/org/apache/omid/tls/TestKeyStoreFileType.java b/common/src/test/java/org/apache/omid/tls/TestKeyStoreFileType.java
new file mode 100644
index 0000000..8092ed3
--- /dev/null
+++ b/common/src/test/java/org/apache/omid/tls/TestKeyStoreFileType.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tls;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/test/java/org/apache/zookeeper/common/KeyStoreFileTypeTest.java">Base
+ *      revision</a>
+ */
+public class TestKeyStoreFileType {
+
+    @Test
+    public void testGetPropertyValue() {
+        Assert.assertEquals("PEM", KeyStoreFileType.PEM.getPropertyValue());
+        Assert.assertEquals("JKS", KeyStoreFileType.JKS.getPropertyValue());
+        Assert.assertEquals("PKCS12", KeyStoreFileType.PKCS12.getPropertyValue());
+        Assert.assertEquals("BCFKS", KeyStoreFileType.BCFKS.getPropertyValue());
+    }
+
+    @Test
+    public void testFromPropertyValue() {
+        Assert.assertEquals(KeyStoreFileType.PEM, KeyStoreFileType.fromPropertyValue("PEM"));
+        Assert.assertEquals(KeyStoreFileType.JKS, KeyStoreFileType.fromPropertyValue("JKS"));
+        Assert.assertEquals(KeyStoreFileType.PKCS12, KeyStoreFileType.fromPropertyValue("PKCS12"));
+        Assert.assertEquals(KeyStoreFileType.BCFKS, KeyStoreFileType.fromPropertyValue("BCFKS"));
+        Assert.assertNull(KeyStoreFileType.fromPropertyValue(""));
+        Assert.assertNull(KeyStoreFileType.fromPropertyValue(null));
+    }
+
+    @Test
+    public void testFromPropertyValueIgnoresCase() {
+        Assert.assertEquals(KeyStoreFileType.PEM, KeyStoreFileType.fromPropertyValue("pem"));
+        Assert.assertEquals(KeyStoreFileType.JKS, KeyStoreFileType.fromPropertyValue("jks"));
+        Assert.assertEquals(KeyStoreFileType.PKCS12, KeyStoreFileType.fromPropertyValue("pkcs12"));
+        Assert.assertEquals(KeyStoreFileType.BCFKS, KeyStoreFileType.fromPropertyValue("bcfks"));
+        Assert.assertNull(KeyStoreFileType.fromPropertyValue(""));
+        Assert.assertNull(KeyStoreFileType.fromPropertyValue(null));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromPropertyValueThrowsOnBadPropertyValue() {
+        KeyStoreFileType.fromPropertyValue("foobar");
+    }
+
+    @Test
+    public void testFromFilename() {
+        Assert.assertEquals(KeyStoreFileType.JKS, KeyStoreFileType.fromFilename("mykey.jks"));
+        Assert.assertEquals(KeyStoreFileType.JKS,
+                KeyStoreFileType.fromFilename("/path/to/key/dir/mykey.jks"));
+        Assert.assertEquals(KeyStoreFileType.PEM, KeyStoreFileType.fromFilename("mykey.pem"));
+        Assert.assertEquals(KeyStoreFileType.PEM,
+                KeyStoreFileType.fromFilename("/path/to/key/dir/mykey.pem"));
+        Assert.assertEquals(KeyStoreFileType.PKCS12, KeyStoreFileType.fromFilename("mykey.p12"));
+        Assert.assertEquals(KeyStoreFileType.PKCS12,
+                KeyStoreFileType.fromFilename("/path/to/key/dir/mykey.p12"));
+        Assert.assertEquals(KeyStoreFileType.BCFKS, KeyStoreFileType.fromFilename("mykey.bcfks"));
+        Assert.assertEquals(KeyStoreFileType.BCFKS,
+                KeyStoreFileType.fromFilename("/path/to/key/dir/mykey.bcfks"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromFilenameThrowsOnBadFileExtension() {
+        KeyStoreFileType.fromFilename("prod.key");
+    }
+
+    @Test
+    public void testFromPropertyValueOrFileName() {
+        // Property value takes precedence if provided
+        Assert.assertEquals(KeyStoreFileType.JKS,
+                KeyStoreFileType.fromPropertyValueOrFileName("JKS", "prod.key"));
+        Assert.assertEquals(KeyStoreFileType.PEM,
+                KeyStoreFileType.fromPropertyValueOrFileName("PEM", "prod.key"));
+        Assert.assertEquals(KeyStoreFileType.PKCS12,
+                KeyStoreFileType.fromPropertyValueOrFileName("PKCS12", "prod.key"));
+        Assert.assertEquals(KeyStoreFileType.BCFKS,
+                KeyStoreFileType.fromPropertyValueOrFileName("BCFKS", "prod.key"));
+        // Falls back to filename detection if no property value
+        Assert.assertEquals(KeyStoreFileType.JKS,
+                KeyStoreFileType.fromPropertyValueOrFileName("", "prod.jks"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromPropertyValueOrFileNameThrowsOnBadPropertyValue() {
+        KeyStoreFileType.fromPropertyValueOrFileName("foobar", "prod.jks");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromPropertyValueOrFileNameThrowsOnBadFileExtension() {
+        KeyStoreFileType.fromPropertyValueOrFileName("", "prod.key");
+    }
+}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/omid/tls/TestX509Util.java b/common/src/test/java/org/apache/omid/tls/TestX509Util.java
new file mode 100644
index 0000000..5bb56e1
--- /dev/null
+++ b/common/src/test/java/org/apache/omid/tls/TestX509Util.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tls;
+
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.ssl.SslContext;
+import org.apache.omid.tls.KeyStoreFileType;
+import org.apache.omid.tls.X509Exception;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.security.Security;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This file has is based on the one in HBase project.
+ * @see <a href=
+ *      "https://github.com/apache/hbase/blob/d2b0074f7ad4c43d31a1a511a0d74feda72451d1/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java">Base
+ *      revision</a>
+ */
+@RunWith(Parameterized.class)
+public class TestX509Util extends BaseX509ParameterizedTestCase {
+
+    @Parameterized.Parameter()
+    public X509KeyType caKeyType;
+
+    @Parameterized.Parameter(value = 1)
+    public X509KeyType certKeyType;
+
+    @Parameterized.Parameter(value = 2)
+    public String keyPassword;
+
+    @Parameterized.Parameter(value = 3)
+    public Integer paramIndex;
+
+    @Parameterized.Parameters(
+            name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
+    public static Collection<Object[]> data() {
+        List<Object[]> params = new ArrayList<>();
+        int paramIndex = 0;
+        for (X509KeyType caKeyType : X509KeyType.values()) {
+            for (X509KeyType certKeyType : X509KeyType.values()) {
+                for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) {
+                    params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
+                }
+            }
+        }
+        return params;
+    }
+
+    private String tlsConfigKeystoreLocation;
+    private String tlsConfigKeystorePassword;
+    private String tlsConfigKeystoreType;
+
+    private String tlsConfigTrustLocation;
+    private String tlsConfigTrustPassword;
+    private String tlsConfigTrustType;
+
+    @Override
+    public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword,
+                     Integer paramIndex) throws Exception {
+        super.init(caKeyType, certKeyType, keyPassword, paramIndex);
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+        tlsConfigKeystoreLocation = x509TestContext.getTlsConfigKeystoreLocation();
+        tlsConfigKeystorePassword = x509TestContext.getTlsConfigKeystorePassword();
+        tlsConfigKeystoreType = x509TestContext.getTlsConfigKeystoreType();
+        tlsConfigTrustLocation = x509TestContext.getTlsConfigTrustLocation();
+        tlsConfigTrustPassword = x509TestContext.getTlsConfigTrustPassword();
+        tlsConfigTrustType = x509TestContext.getTlsConfigTrustType();
+    }
+
+    @After
+    public void cleanUp() {
+        x509TestContext.clearSystemProperties();
+        System.clearProperty("com.sun.net.ssl.checkRevocation");
+        System.clearProperty("com.sun.security.enableCRLDP");
+        Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
+        Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
+    }
+
+    @Test
+    public void testCreateSSLContextWithoutCustomProtocol() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        SslContext sslContext = X509Util.createSslContextForClient(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+        ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+        assertEquals(new String[] { X509Util.DEFAULT_PROTOCOL },
+                sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols());
+    }
+
+    @Test
+    public void testCreateSSLContextWithCustomProtocol() throws Exception {
+        final String protocol = "TLSv1.1";
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+
+        ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+        SslContext sslContext = X509Util.createSslContextForClient(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, protocol, null, X509Util.DEFAULT_PROTOCOL);
+        assertEquals(Collections.singletonList(protocol),
+                Arrays.asList(sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols()));
+    }
+
+    @Test(expected = X509Exception.SSLContextException.class)
+    public void testCreateSSLContextWithoutKeyStoreLocationServer() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        tlsConfigKeystoreLocation = "";
+        SslContext sslContext = X509Util.createSslContextForServer(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+    }
+
+    @Test
+    public void testCreateSSLContextWithoutKeyStoreLocationClient() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        tlsConfigKeystoreLocation = "";
+        SslContext sslContext = X509Util.createSslContextForClient(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+    }
+
+    @Test(expected = X509Exception.class)
+    public void testCreateSSLContextWithoutKeyStorePassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        if (!x509TestContext.isKeyStoreEncrypted()) {
+            throw new X509Exception.SSLContextException("");
+        }
+        tlsConfigKeystorePassword = "";
+        SslContext sslContext = X509Util.createSslContextForServer(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+    }
+
+    @Test
+    public void testCreateSSLContextWithoutTrustStoreLocationClient() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        tlsConfigTrustLocation = "";
+        SslContext sslContext = X509Util.createSslContextForClient(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+    }
+
+    @Test
+    public void testCreateSSLContextWithoutTrustStoreLocationServer() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        tlsConfigTrustLocation = "";
+        SslContext sslContext = X509Util.createSslContextForServer(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+    }
+
+    // It would be great to test the value of PKIXBuilderParameters#setRevocationEnabled,
+    // but it does not appear to be possible
+    @Test
+    public void testCRLEnabled() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        SslContext sslContext = X509Util.createSslContextForServer(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, true, false, null, null, X509Util.DEFAULT_PROTOCOL);
+        assertTrue(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
+        assertTrue(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
+        assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
+    }
+
+    @Test
+    public void testCRLDisabled() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        SslContext sslContext = X509Util.createSslContextForServer(tlsConfigKeystoreLocation, tlsConfigKeystorePassword.toCharArray(), tlsConfigKeystoreType, tlsConfigTrustLocation, tlsConfigTrustPassword.toCharArray(), tlsConfigTrustType, false, false, null, null, X509Util.DEFAULT_PROTOCOL);
+        assertFalse(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
+        assertFalse(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
+        assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
+    }
+
+    @Test
+    public void testLoadJKSKeyStore() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        // Make sure we can instantiate a key manager from the JKS file on disk
+        X509Util.createKeyManager(
+                x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+                x509TestContext.getKeyStorePassword().toCharArray(), KeyStoreFileType.JKS.getPropertyValue());
+    }
+
+    @Test
+    public void testLoadJKSKeyStoreNullPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        if (!x509TestContext.getKeyStorePassword().isEmpty()) {
+            return;
+        }
+        // Make sure that empty password and null password are treated the same
+        X509Util.createKeyManager(
+                x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
+                KeyStoreFileType.JKS.getPropertyValue());
+    }
+
+    @Test
+    public void testLoadJKSKeyStoreFileTypeDefaultToJks() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        // Make sure we can instantiate a key manager from the JKS file on disk
+        X509Util.createKeyManager(
+                x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+                x509TestContext.getKeyStorePassword().toCharArray(),
+                null /* null StoreFileType means 'autodetect from file extension' */);
+    }
+
+    @Test
+    public void testLoadJKSKeyStoreWithWrongPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        assertThrows(X509Exception.KeyManagerException.class, () -> {
+            // Attempting to load with the wrong key password should fail
+            X509Util.createKeyManager(
+                    x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), "wrong password".toCharArray(),
+                    KeyStoreFileType.JKS.getPropertyValue());
+        });
+    }
+
+    @Test
+    public void testLoadJKSTrustStore() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        // Make sure we can instantiate a trust manager from the JKS file on disk
+        X509Util.createTrustManager(
+                x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+                x509TestContext.getTrustStorePassword().toCharArray(), KeyStoreFileType.JKS.getPropertyValue(), true, true);
+    }
+
+    @Test
+    public void testLoadJKSTrustStoreNullPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        if (!x509TestContext.getTrustStorePassword().isEmpty()) {
+            return;
+        }
+        // Make sure that empty password and null password are treated the same
+        X509Util.createTrustManager(
+                x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
+                KeyStoreFileType.JKS.getPropertyValue(), false, false);
+    }
+
+    @Test
+    public void testLoadJKSTrustStoreFileTypeDefaultToJks() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        // Make sure we can instantiate a trust manager from the JKS file on disk
+        X509Util.createTrustManager(
+                x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+                x509TestContext.getTrustStorePassword().toCharArray(), null, // null StoreFileType means 'autodetect from
+                // file extension'
+                true, true);
+    }
+
+    @Test
+    public void testLoadJKSTrustStoreWithWrongPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        assertThrows(X509Exception.TrustManagerException.class, () -> {
+            // Attempting to load with the wrong key password should fail
+            X509Util.createTrustManager(
+                    x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), "wrong password".toCharArray(),
+                    KeyStoreFileType.JKS.getPropertyValue(), true, true);
+        });
+    }
+
+    @Test
+    public void testLoadPKCS12KeyStore() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        // Make sure we can instantiate a key manager from the PKCS12 file on disk
+        X509Util.createKeyManager(
+                x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+                x509TestContext.getKeyStorePassword().toCharArray(), KeyStoreFileType.PKCS12.getPropertyValue());
+    }
+
+    @Test
+    public void testLoadPKCS12KeyStoreNullPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        if (!x509TestContext.getKeyStorePassword().isEmpty()) {
+            return;
+        }
+        // Make sure that empty password and null password are treated the same
+        X509Util.createKeyManager(
+                x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), null,
+                KeyStoreFileType.PKCS12.getPropertyValue());
+    }
+
+    @Test
+    public void testLoadPKCS12KeyStoreWithWrongPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        assertThrows(X509Exception.KeyManagerException.class, () -> {
+            // Attempting to load with the wrong key password should fail
+            X509Util.createKeyManager(
+                    x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+                    "wrong password".toCharArray(), KeyStoreFileType.PKCS12.getPropertyValue());
+        });
+    }
+
+    @Test
+    public void testLoadPKCS12TrustStore() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        // Make sure we can instantiate a trust manager from the PKCS12 file on disk
+        X509Util.createTrustManager(
+                x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+                x509TestContext.getTrustStorePassword().toCharArray(), KeyStoreFileType.PKCS12.getPropertyValue(), true,
+                true);
+    }
+
+    @Test
+    public void testLoadPKCS12TrustStoreNullPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        if (!x509TestContext.getTrustStorePassword().isEmpty()) {
+            return;
+        }
+        // Make sure that empty password and null password are treated the same
+        X509Util.createTrustManager(
+                x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), null,
+                KeyStoreFileType.PKCS12.getPropertyValue(), false, false);
+    }
+
+    @Test
+    public void testLoadPKCS12TrustStoreWithWrongPassword() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        assertThrows(X509Exception.TrustManagerException.class, () -> {
+            // Attempting to load with the wrong key password should fail
+            X509Util.createTrustManager(
+                    x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+                    "wrong password".toCharArray(), KeyStoreFileType.PKCS12.getPropertyValue(), true, true);
+        });
+    }
+
+    @Test
+    public void testGetDefaultCipherSuitesJava8() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("1.8");
+        // Java 8 default should have the CBC suites first
+        assertTrue(cipherSuites[0].contains("CBC"));
+    }
+
+    @Test
+    public void testGetDefaultCipherSuitesJava9() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("9");
+        // Java 9+ default should have the GCM suites first
+        assertTrue(cipherSuites[0].contains("GCM"));
+    }
+
+    @Test
+    public void testGetDefaultCipherSuitesJava10() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("10");
+        // Java 9+ default should have the GCM suites first
+        assertTrue(cipherSuites[0].contains("GCM"));
+    }
+
+    @Test
+    public void testGetDefaultCipherSuitesJava11() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("11");
+        // Java 9+ default should have the GCM suites first
+        assertTrue(cipherSuites[0].contains("GCM"));
+    }
+
+    @Test
+    public void testGetDefaultCipherSuitesUnknownVersion() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("notaversion");
+        // If version can't be parsed, use the more conservative Java 8 default
+        assertTrue(cipherSuites[0].contains("CBC"));
+    }
+
+    @Test
+    public void testGetDefaultCipherSuitesNullVersion() throws Exception {
+        init(caKeyType, certKeyType, keyPassword, paramIndex);
+        assertThrows(NullPointerException.class, () -> {
+            X509Util.getDefaultCipherSuitesForJavaVersion(null);
+        });
+    }
+}
diff --git a/common/src/test/java/org/apache/omid/tls/X509KeyType.java b/common/src/test/java/org/apache/omid/tls/X509KeyType.java
new file mode 100644
index 0000000..cf22e10
--- /dev/null
+++ b/common/src/test/java/org/apache/omid/tls/X509KeyType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tls;
+
+/**
+ * Represents a type of key pair used for X509 certs in tests. The two options are RSA or EC
+ * (elliptic curve).
+ * <p/>
+ * This file has is based on the one in HBase project.
+ * @see <a href=
+ *      "https://github.com/apache/hbase/blob/d2b0074f7ad4c43d31a1a511a0d74feda72451d1/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509KeyType.java">Base
+ *      revision</a>
+ */
+public enum X509KeyType {
+    RSA,
+    EC
+}
diff --git a/common/src/test/java/org/apache/omid/tls/X509TestContext.java b/common/src/test/java/org/apache/omid/tls/X509TestContext.java
new file mode 100644
index 0000000..3e8509a
--- /dev/null
+++ b/common/src/test/java/org/apache/omid/tls/X509TestContext.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tls;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.omid.tls.KeyStoreFileType;
+import org.bouncycastle.asn1.x500.X500NameBuilder;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.OperatorCreationException;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.security.KeyPair;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class simplifies the creation of certificates and private keys for SSL/TLS connections.
+ * <p/>
+ * This file has is based on the one in HBase project.
+ * @see <a href=
+ *      "https://github.com/apache/hbase/blob/d2b0074f7ad4c43d31a1a511a0d74feda72451d1/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java">Base
+ *      revision</a>
+ */
+public final class X509TestContext {
+
+    private static final String TRUST_STORE_PREFIX = "hbase_test_ca";
+    private static final String KEY_STORE_PREFIX = "hbase_test_key";
+
+    private final File tempDir;
+    private String tlsConfigKeystoreLocation;
+    private String tlsConfigKeystorePassword;
+    private String tlsConfigKeystoreType;
+
+    private String tlsConfigTrustLocation;
+    private String tlsConfigTrustPassword;
+    private String tlsConfigTrustType;
+
+    private final X509Certificate trustStoreCertificate;
+    private final String trustStorePassword;
+    private File trustStoreJksFile;
+    private File trustStorePemFile;
+    private File trustStorePkcs12File;
+    private File trustStoreBcfksFile;
+
+    private final KeyPair keyStoreKeyPair;
+    private final X509Certificate keyStoreCertificate;
+    private final String keyStorePassword;
+    private File keyStoreJksFile;
+    private File keyStorePemFile;
+    private File keyStorePkcs12File;
+    private File keyStoreBcfksFile;
+
+    /**
+     * Constructor is intentionally private, use the Builder class instead.
+     * @param tempDir            the directory in which key store and trust store temp files will be
+     *                           written.
+     * @param trustStoreKeyPair  the key pair for the trust store.
+     * @param trustStorePassword the password to protect a JKS trust store (ignored for PEM trust
+     *                           stores).
+     * @param keyStoreKeyPair    the key pair for the key store.
+     * @param keyStorePassword   the password to protect the key store private key.
+     */
+    private X509TestContext(File tempDir, KeyPair trustStoreKeyPair, String trustStorePassword,
+                            KeyPair keyStoreKeyPair, String keyStorePassword)
+            throws IOException, GeneralSecurityException, OperatorCreationException {
+        if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) {
+            throw new IllegalStateException("BC Security provider was not found");
+        }
+        this.tempDir = requireNonNull(tempDir);
+        if (!tempDir.isDirectory()) {
+            throw new IllegalArgumentException("Not a directory: " + tempDir);
+        }
+        this.trustStorePassword = requireNonNull(trustStorePassword);
+        this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair);
+        this.keyStorePassword = requireNonNull(keyStorePassword);
+
+        X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+        caNameBuilder.addRDN(BCStyle.CN,
+                MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
+        trustStoreCertificate =
+                X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair);
+
+        X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+        nameBuilder.addRDN(BCStyle.CN,
+                MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test");
+        keyStoreCertificate = X509TestHelpers.newCert(trustStoreCertificate, trustStoreKeyPair,
+                nameBuilder.build(), keyStoreKeyPair.getPublic());
+        trustStorePkcs12File = null;
+        trustStorePemFile = null;
+        trustStoreJksFile = null;
+        keyStorePkcs12File = null;
+        keyStorePemFile = null;
+        keyStoreJksFile = null;
+    }
+
+    public File getTempDir() {
+        return tempDir;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    /**
+     * Returns the path to the trust store file in the given format (JKS or PEM). Note that the file
+     * is created lazily, the first time this method is called. The trust store file is temporary and
+     * will be deleted on exit.
+     * @param storeFileType the store file type (JKS or PEM).
+     * @return the path to the trust store file.
+     * @throws IOException if there is an error creating the trust store file.
+     */
+    public File getTrustStoreFile(KeyStoreFileType storeFileType) throws IOException {
+//        todo add bcfks here based on https://github.com/apache/zookeeper/commit/ec1503bb00945471baa248f392eed51064bb48ab
+        switch (storeFileType) {
+            case JKS:
+                return getTrustStoreJksFile();
+            case PEM:
+                return getTrustStorePemFile();
+            case PKCS12:
+                return getTrustStorePkcs12File();
+            case BCFKS:
+                return getTrustStoreBcfksFile();
+            default:
+                throw new IllegalArgumentException("Invalid trust store type: " + storeFileType
+                        + ", must be one of: " + Arrays.toString(KeyStoreFileType.values()));
+        }
+    }
+
+    private File getTrustStoreJksFile() throws IOException {
+        if (trustStoreJksFile == null) {
+            File trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX,
+                    KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
+            trustStoreJksFile.deleteOnExit();
+            try (
+                    final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) {
+                byte[] bytes =
+                        X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+                trustStoreOutputStream.write(bytes);
+                trustStoreOutputStream.flush();
+            } catch (GeneralSecurityException e) {
+                throw new IOException(e);
+            }
+            this.trustStoreJksFile = trustStoreJksFile;
+        }
+        return trustStoreJksFile;
+    }
+
+    private File getTrustStorePemFile() throws IOException {
+        if (trustStorePemFile == null) {
+            File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX,
+                    KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
+            trustStorePemFile.deleteOnExit();
+            FileUtils.writeStringToFile(trustStorePemFile,
+                    X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII,
+                    false);
+            this.trustStorePemFile = trustStorePemFile;
+        }
+        return trustStorePemFile;
+    }
+
+    private File getTrustStorePkcs12File() throws IOException {
+        if (trustStorePkcs12File == null) {
+            File trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX,
+                    KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
+            trustStorePkcs12File.deleteOnExit();
+            try (final FileOutputStream trustStoreOutputStream =
+                         new FileOutputStream(trustStorePkcs12File)) {
+                byte[] bytes =
+                        X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword);
+                trustStoreOutputStream.write(bytes);
+                trustStoreOutputStream.flush();
+            } catch (GeneralSecurityException e) {
+                throw new IOException(e);
+            }
+            this.trustStorePkcs12File = trustStorePkcs12File;
+        }
+        return trustStorePkcs12File;
+    }
+
+    private File getTrustStoreBcfksFile() throws IOException {
+        if (trustStoreBcfksFile == null) {
+            File trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX,
+                    KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir);
+            trustStoreBcfksFile.deleteOnExit();
+            try (
+                    final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) {
+                byte[] bytes =
+                        X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+                trustStoreOutputStream.write(bytes);
+                trustStoreOutputStream.flush();
+            } catch (GeneralSecurityException e) {
+                throw new IOException(e);
+            }
+            this.trustStoreBcfksFile = trustStoreBcfksFile;
+        }
+        return trustStoreBcfksFile;
+    }
+
+    public X509Certificate getKeyStoreCertificate() {
+        return keyStoreCertificate;
+    }
+
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+
+    public boolean isKeyStoreEncrypted() {
+        return keyStorePassword.length() > 0;
+    }
+
+    /**
+     * Returns the path to the key store file in the given format (JKS, PEM, ...). Note that the file
+     * is created lazily, the first time this method is called. The key store file is temporary and
+     * will be deleted on exit.
+     * @param storeFileType the store file type (JKS, PEM, ...).
+     * @return the path to the key store file.
+     * @throws IOException if there is an error creating the key store file.
+     */
+    public File getKeyStoreFile(KeyStoreFileType storeFileType) throws IOException {
+        switch (storeFileType) {
+            case JKS:
+                return getKeyStoreJksFile();
+            case PEM:
+                return getKeyStorePemFile();
+            case PKCS12:
+                return getKeyStorePkcs12File();
+            case BCFKS:
+                return getKeyStoreBcfksFile();
+            default:
+                throw new IllegalArgumentException("Invalid key store type: " + storeFileType
+                        + ", must be one of: " + Arrays.toString(KeyStoreFileType.values()));
+        }
+    }
+
+    private File getKeyStoreJksFile() throws IOException {
+        if (keyStoreJksFile == null) {
+            File keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX,
+                    KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
+            keyStoreJksFile.deleteOnExit();
+            try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) {
+                byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate,
+                        keyStoreKeyPair.getPrivate(), keyStorePassword);
+                keyStoreOutputStream.write(bytes);
+                keyStoreOutputStream.flush();
+            } catch (GeneralSecurityException e) {
+                throw new IOException(e);
+            }
+            this.keyStoreJksFile = keyStoreJksFile;
+        }
+        return keyStoreJksFile;
+    }
+
+    private File getKeyStorePemFile() throws IOException {
+        if (keyStorePemFile == null) {
+            try {
+                File keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX,
+                        KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
+                keyStorePemFile.deleteOnExit();
+                FileUtils.writeStringToFile(keyStorePemFile,
+                        X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate,
+                                keyStoreKeyPair.getPrivate(), keyStorePassword),
+                        StandardCharsets.US_ASCII, false);
+                this.keyStorePemFile = keyStorePemFile;
+            } catch (OperatorCreationException e) {
+                throw new IOException(e);
+            }
+        }
+        return keyStorePemFile;
+    }
+
+    private File getKeyStorePkcs12File() throws IOException {
+        if (keyStorePkcs12File == null) {
+            File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX,
+                    KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
+            keyStorePkcs12File.deleteOnExit();
+            try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) {
+                byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate,
+                        keyStoreKeyPair.getPrivate(), keyStorePassword);
+                keyStoreOutputStream.write(bytes);
+                keyStoreOutputStream.flush();
+            } catch (GeneralSecurityException e) {
+                throw new IOException(e);
+            }
+            this.keyStorePkcs12File = keyStorePkcs12File;
+        }
+        return keyStorePkcs12File;
+    }
+
+    private File getKeyStoreBcfksFile() throws IOException {
+        if (keyStoreBcfksFile == null) {
+            File keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX,
+                    KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir);
+            keyStoreBcfksFile.deleteOnExit();
+            try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) {
+                byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate,
+                        keyStoreKeyPair.getPrivate(), keyStorePassword);
+                keyStoreOutputStream.write(bytes);
+                keyStoreOutputStream.flush();
+            } catch (GeneralSecurityException e) {
+                throw new IOException(e);
+            }
+            this.keyStoreBcfksFile = keyStoreBcfksFile;
+        }
+        return keyStoreBcfksFile;
+    }
+
+    /**
+     * Sets the SSL system properties such that the given X509Util object can be used to create SSL
+     * Contexts that will use the trust store and key store files created by this test context.
+     * Example usage:
+     *
+     * <pre>
+     *     X509TestContext testContext = ...; // create the test context
+     *     X509Util x509Util = new QuorumX509Util();
+     *     testContext.setSystemProperties(x509Util, KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+     *     // The returned context will use the key store and trust store created by the test context.
+     *     SSLContext ctx = x509Util.getDefaultSSLContext();
+     * </pre>
+     *
+     * @param keyStoreFileType   the store file type to use for the key store (JKS, PEM, ...).
+     * @param trustStoreFileType the store file type to use for the trust store (JKS, PEM, ...).
+     * @throws IOException if there is an error creating the key store file or trust store file.
+     */
+    public void setSystemProperties(KeyStoreFileType keyStoreFileType,
+                                    KeyStoreFileType trustStoreFileType) throws IOException {
+        tlsConfigKeystoreLocation = this.getKeyStoreFile(keyStoreFileType).getAbsolutePath();
+
+        tlsConfigKeystorePassword = this.getKeyStorePassword();
+        tlsConfigKeystoreType = keyStoreFileType.getPropertyValue();
+        tlsConfigTrustLocation = this.getTrustStoreFile(trustStoreFileType).getAbsolutePath();
+        tlsConfigTrustPassword = this.getTrustStorePassword();
+        tlsConfigTrustType = trustStoreFileType.getPropertyValue();
+    }
+
+    public void clearSystemProperties() {
+        tlsConfigKeystoreLocation = null;
+        tlsConfigKeystorePassword = null;
+        tlsConfigKeystoreType = null;
+        tlsConfigTrustLocation = null;
+        tlsConfigTrustPassword = null;
+        tlsConfigTrustType = null;
+    }
+
+    public String getTlsConfigKeystoreLocation() {
+        return tlsConfigKeystoreLocation;
+    }
+
+    public void setTlsConfigKeystoreLocation(String tlsConfigKeystoreLocation) {
+        this.tlsConfigKeystoreLocation = tlsConfigKeystoreLocation;
+    }
+
+    public String getTlsConfigKeystorePassword() {
+        return tlsConfigKeystorePassword;
+    }
+
+    public void setTlsConfigKeystorePassword(String tlsConfigKeystorePassword) {
+        this.tlsConfigKeystorePassword = tlsConfigKeystorePassword;
+    }
+
+    public String getTlsConfigKeystoreType() {
+        return tlsConfigKeystoreType;
+    }
+
+    public void setTlsConfigKeystoreType(String tlsConfigKeystoreType) {
+        this.tlsConfigKeystoreType = tlsConfigKeystoreType;
+    }
+
+    public String getTlsConfigTrustLocation() {
+        return tlsConfigTrustLocation;
+    }
+
+    public void setTlsConfigTrustLocation(String tlsConfigTrustLocation) {
+        this.tlsConfigTrustLocation = tlsConfigTrustLocation;
+    }
+
+    public String getTlsConfigTrustPassword() {
+        return tlsConfigTrustPassword;
+    }
+
+    public void setTlsConfigTrustPassword(String tlsConfigTrustPassword) {
+        this.tlsConfigTrustPassword = tlsConfigTrustPassword;
+    }
+
+    public String getTlsConfigTrustType() {
+        return tlsConfigTrustType;
+    }
+
+    public void setTlsConfigTrustType(String tlsConfigTrustType) {
+        this.tlsConfigTrustType = tlsConfigTrustType;
+    }
+
+    /**
+     * Builder class, used for creating new instances of X509TestContext.
+     */
+    public static class Builder {
+
+        private File tempDir;
+        private X509KeyType trustStoreKeyType;
+        private String trustStorePassword;
+        private X509KeyType keyStoreKeyType;
+        private String keyStorePassword;
+
+        /**
+         * Creates an empty builder.
+         */
+        public Builder() {
+            trustStoreKeyType = X509KeyType.EC;
+            trustStorePassword = "";
+            keyStoreKeyType = X509KeyType.EC;
+            keyStorePassword = "";
+        }
+
+        /**
+         * Builds a new X509TestContext from this builder.
+         * @return a new X509TestContext
+         */
+        public X509TestContext build()
+                throws IOException, GeneralSecurityException, OperatorCreationException {
+            KeyPair trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType);
+            KeyPair keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType);
+            return new X509TestContext(tempDir, trustStoreKeyPair, trustStorePassword, keyStoreKeyPair,
+                    keyStorePassword);
+        }
+
+        /**
+         * Sets the temporary directory. Certificate and private key files will be created in this
+         * directory.
+         * @param tempDir the temp directory.
+         * @return this Builder.
+         */
+        public Builder setTempDir(File tempDir) {
+            this.tempDir = tempDir;
+            return this;
+        }
+
+        /**
+         * Sets the trust store key type. The CA key generated for the test context will be of this
+         * type.
+         * @param keyType the key type.
+         * @return this Builder.
+         */
+        public Builder setTrustStoreKeyType(X509KeyType keyType) {
+            trustStoreKeyType = keyType;
+            return this;
+        }
+
+        /**
+         * Sets the trust store password. Ignored for PEM trust stores, JKS trust stores will be
+         * encrypted with this password.
+         * @param password the password.
+         * @return this Builder.
+         */
+        public Builder setTrustStorePassword(String password) {
+            trustStorePassword = password;
+            return this;
+        }
+
+        /**
+         * Sets the key store key type. The private key generated for the test context will be of this
+         * type.
+         * @param keyType the key type.
+         * @return this Builder.
+         */
+        public Builder setKeyStoreKeyType(X509KeyType keyType) {
+            keyStoreKeyType = keyType;
+            return this;
+        }
+
+        /**
+         * Sets the key store password. The private key (PEM, JKS) and certificate (JKS only) will be
+         * encrypted with this password.
+         * @param password the password.
+         * @return this Builder.
+         */
+        public Builder setKeyStorePassword(String password) {
+            keyStorePassword = password;
+            return this;
+        }
+    }
+
+    /**
+     * Returns a new default-constructed Builder.
+     * @return a new Builder.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+}
diff --git a/common/src/test/java/org/apache/omid/tls/X509TestHelpers.java b/common/src/test/java/org/apache/omid/tls/X509TestHelpers.java
new file mode 100644
index 0000000..64cfe4e
--- /dev/null
+++ b/common/src/test/java/org/apache/omid/tls/X509TestHelpers.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tls;
+
+import org.bouncycastle.asn1.DERIA5String;
+import org.bouncycastle.asn1.DEROctetString;
+import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.*;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.jcajce.JcaPEMWriter;
+import org.bouncycastle.openssl.jcajce.JcaPKCS8Generator;
+import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8EncryptorBuilder;
+import org.bouncycastle.operator.*;
+import org.bouncycastle.operator.bc.BcContentSignerBuilder;
+import org.bouncycastle.operator.bc.BcECContentSignerBuilder;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.*;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.security.spec.ECGenParameterSpec;
+import java.security.spec.RSAKeyGenParameterSpec;
+import java.time.LocalDate;
+import java.time.ZoneId;
+
+/**
+ * This class contains helper methods for creating X509 certificates and key pairs, and for
+ * serializing them to JKS, PEM or other keystore type files.
+ * <p/>
+ * This file has is based on the one in HBase project.
+ * @see <a href=
+ *      "https://github.com/apache/hbase/blob/d2b0074f7ad4c43d31a1a511a0d74feda72451d1/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java">Base
+ *      revision</a>
+ */
+final class X509TestHelpers {
+
+    private static final SecureRandom PRNG = new SecureRandom();
+    private static final int DEFAULT_RSA_KEY_SIZE_BITS = 2048;
+    private static final BigInteger DEFAULT_RSA_PUB_EXPONENT = RSAKeyGenParameterSpec.F4; // 65537
+    private static final String DEFAULT_ELLIPTIC_CURVE_NAME = "secp256r1";
+    // Per RFC 5280 section 4.1.2.2, X509 certificates can use up to 20 bytes == 160 bits for serial
+    // numbers.
+    private static final int SERIAL_NUMBER_MAX_BITS = 20 * Byte.SIZE;
+
+    /**
+     * Uses the private key of the given key pair to create a self-signed CA certificate with the
+     * public half of the key pair and the given subject and expiration. The issuer of the new cert
+     * will be equal to the subject. Returns the new certificate. The returned certificate should be
+     * used as the trust store. The private key of the input key pair should be used to sign
+     * certificates that are used by test peers to establish TLS connections to each other.
+     * @param subject the subject of the new certificate being created.
+     * @param keyPair the key pair to use. The public key will be embedded in the new certificate, and
+     *                the private key will be used to self-sign the certificate.
+     * @return a new self-signed CA certificate.
+     */
+    public static X509Certificate newSelfSignedCACert(X500Name subject, KeyPair keyPair)
+            throws IOException, OperatorCreationException, GeneralSecurityException {
+        LocalDate now = LocalDate.now(ZoneId.systemDefault());
+        X509v3CertificateBuilder builder = initCertBuilder(subject, // for self-signed certs,
+                // issuer == subject
+                now, now.plusDays(1), subject, keyPair.getPublic());
+        builder.addExtension(Extension.basicConstraints, true, new BasicConstraints(true)); // is a CA
+        builder.addExtension(Extension.keyUsage, true,
+                new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyCertSign | KeyUsage.cRLSign));
+        return buildAndSignCertificate(keyPair.getPrivate(), builder);
+    }
+
+    /**
+     * Using the private key of the given CA key pair and the Subject of the given CA cert as the
+     * Issuer, issues a new cert with the given subject and public key. The returned certificate,
+     * combined with the private key half of the <code>certPublicKey</code>, should be used as the key
+     * store.
+     * @param caCert        the certificate of the CA that's doing the signing.
+     * @param caKeyPair     the key pair of the CA. The private key will be used to sign. The public
+     *                      key must match the public key in the <code>caCert</code>.
+     * @param certSubject   the subject field of the new cert being issued.
+     * @param certPublicKey the public key of the new cert being issued.
+     * @return a new certificate signed by the CA's private key.
+     */
+    public static X509Certificate newCert(X509Certificate caCert, KeyPair caKeyPair,
+                                          X500Name certSubject, PublicKey certPublicKey)
+            throws IOException, OperatorCreationException, GeneralSecurityException {
+        if (!caKeyPair.getPublic().equals(caCert.getPublicKey())) {
+            throw new IllegalArgumentException(
+                    "CA private key does not match the public key in " + "the CA cert");
+        }
+        LocalDate now = LocalDate.now(ZoneId.systemDefault());
+        X509v3CertificateBuilder builder = initCertBuilder(new X500Name(caCert.getIssuerDN().getName()),
+                now, now.plusDays(1), certSubject, certPublicKey);
+        builder.addExtension(Extension.basicConstraints, true, new BasicConstraints(false)); // not a CA
+        builder.addExtension(Extension.keyUsage, true,
+                new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyEncipherment));
+        builder.addExtension(Extension.extendedKeyUsage, true, new ExtendedKeyUsage(
+                new KeyPurposeId[] { KeyPurposeId.id_kp_serverAuth, KeyPurposeId.id_kp_clientAuth }));
+
+        builder.addExtension(Extension.subjectAlternativeName, false, getLocalhostSubjectAltNames());
+        return buildAndSignCertificate(caKeyPair.getPrivate(), builder);
+    }
+
+    /**
+     * Returns subject alternative names for "localhost".
+     * @return the subject alternative names for "localhost".
+     */
+    private static GeneralNames getLocalhostSubjectAltNames() throws UnknownHostException {
+        InetAddress[] localAddresses = InetAddress.getAllByName("localhost");
+        GeneralName[] generalNames = new GeneralName[localAddresses.length + 1];
+        for (int i = 0; i < localAddresses.length; i++) {
+            generalNames[i] =
+                    new GeneralName(GeneralName.iPAddress, new DEROctetString(localAddresses[i].getAddress()));
+        }
+        generalNames[generalNames.length - 1] =
+                new GeneralName(GeneralName.dNSName, new DERIA5String("localhost"));
+        return new GeneralNames(generalNames);
+    }
+
+    /**
+     * Helper method for newSelfSignedCACert() and newCert(). Initializes a X509v3CertificateBuilder
+     * with logic that's common to both methods.
+     * @param issuer           Issuer field of the new cert.
+     * @param notBefore        date before which the new cert is not valid.
+     * @param notAfter         date after which the new cert is not valid.
+     * @param subject          Subject field of the new cert.
+     * @param subjectPublicKey public key to store in the new cert.
+     * @return a X509v3CertificateBuilder that can be further customized to finish creating the new
+     *         cert.
+     */
+    private static X509v3CertificateBuilder initCertBuilder(X500Name issuer, LocalDate notBefore,
+                                                            LocalDate notAfter, X500Name subject, PublicKey subjectPublicKey) {
+        return new X509v3CertificateBuilder(issuer, new BigInteger(SERIAL_NUMBER_MAX_BITS, PRNG),
+                java.sql.Date.valueOf(notBefore), java.sql.Date.valueOf(notAfter), subject,
+                SubjectPublicKeyInfo.getInstance(subjectPublicKey.getEncoded()));
+    }
+
+    /**
+     * Signs the certificate being built by the given builder using the given private key and returns
+     * the certificate.
+     * @param privateKey the private key to sign the certificate with.
+     * @param builder    the cert builder that contains the certificate data.
+     * @return the signed certificate.
+     */
+    private static X509Certificate buildAndSignCertificate(PrivateKey privateKey,
+                                                           X509v3CertificateBuilder builder)
+            throws IOException, OperatorCreationException, CertificateException {
+        BcContentSignerBuilder signerBuilder;
+        if (privateKey.getAlgorithm().contains("RSA")) { // a little hacky way to detect key type, but
+            // it works
+            AlgorithmIdentifier signatureAlgorithm =
+                    new DefaultSignatureAlgorithmIdentifierFinder().find("SHA256WithRSAEncryption");
+            AlgorithmIdentifier digestAlgorithm =
+                    new DefaultDigestAlgorithmIdentifierFinder().find(signatureAlgorithm);
+            signerBuilder = new BcRSAContentSignerBuilder(signatureAlgorithm, digestAlgorithm);
+        } else { // if not RSA, assume EC
+            AlgorithmIdentifier signatureAlgorithm =
+                    new DefaultSignatureAlgorithmIdentifierFinder().find("SHA256withECDSA");
+            AlgorithmIdentifier digestAlgorithm =
+                    new DefaultDigestAlgorithmIdentifierFinder().find(signatureAlgorithm);
+            signerBuilder = new BcECContentSignerBuilder(signatureAlgorithm, digestAlgorithm);
+        }
+        AsymmetricKeyParameter privateKeyParam = PrivateKeyFactory.createKey(privateKey.getEncoded());
+        ContentSigner signer = signerBuilder.build(privateKeyParam);
+        return toX509Cert(builder.build(signer));
+    }
+
+    /**
+     * Generates a new asymmetric key pair of the given type.
+     * @param keyType the type of key pair to generate.
+     * @return the new key pair.
+     * @throws GeneralSecurityException if your java crypto providers are messed up.
+     */
+    public static KeyPair generateKeyPair(X509KeyType keyType) throws GeneralSecurityException {
+        switch (keyType) {
+            case RSA:
+                return generateRSAKeyPair();
+            case EC:
+                return generateECKeyPair();
+            default:
+                throw new IllegalArgumentException("Invalid X509KeyType");
+        }
+    }
+
+    /**
+     * Generates an RSA key pair with a 2048-bit private key and F4 (65537) as the public exponent.
+     * @return the key pair.
+     */
+    public static KeyPair generateRSAKeyPair() throws GeneralSecurityException {
+        KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
+        RSAKeyGenParameterSpec keyGenSpec =
+                new RSAKeyGenParameterSpec(DEFAULT_RSA_KEY_SIZE_BITS, DEFAULT_RSA_PUB_EXPONENT);
+        keyGen.initialize(keyGenSpec, PRNG);
+        return keyGen.generateKeyPair();
+    }
+
+    /**
+     * Generates an elliptic curve key pair using the "secp256r1" aka "prime256v1" aka "NIST P-256"
+     * curve.
+     * @return the key pair.
+     */
+    public static KeyPair generateECKeyPair() throws GeneralSecurityException {
+        KeyPairGenerator keyGen = KeyPairGenerator.getInstance("EC");
+        keyGen.initialize(new ECGenParameterSpec(DEFAULT_ELLIPTIC_CURVE_NAME), PRNG);
+        return keyGen.generateKeyPair();
+    }
+
+    /**
+     * PEM-encodes the given X509 certificate and private key (compatible with OpenSSL), optionally
+     * protecting the private key with a password. Concatenates them both and returns the result as a
+     * single string. This creates the PEM encoding of a key store.
+     * @param cert        the X509 certificate to PEM-encode.
+     * @param privateKey  the private key to PEM-encode.
+     * @param keyPassword an optional key password. If empty or null, the private key will not be
+     *                    encrypted.
+     * @return a String containing the PEM encodings of the certificate and private key.
+     * @throws IOException               if converting the certificate or private key to PEM format
+     *                                   fails.
+     * @throws OperatorCreationException if constructing the encryptor from the given password fails.
+     */
+    public static String pemEncodeCertAndPrivateKey(X509Certificate cert, PrivateKey privateKey,
+                                                    String keyPassword) throws IOException, OperatorCreationException {
+        return pemEncodeX509Certificate(cert) + "\n" + pemEncodePrivateKey(privateKey, keyPassword);
+    }
+
+    /**
+     * PEM-encodes the given private key (compatible with OpenSSL), optionally protecting it with a
+     * password, and returns the result as a String.
+     * @param key      the private key.
+     * @param password an optional key password. If empty or null, the private key will not be
+     *                 encrypted.
+     * @return a String containing the PEM encoding of the private key.
+     * @throws IOException               if converting the key to PEM format fails.
+     * @throws OperatorCreationException if constructing the encryptor from the given password fails.
+     */
+    public static String pemEncodePrivateKey(PrivateKey key, String password)
+            throws IOException, OperatorCreationException {
+        StringWriter stringWriter = new StringWriter();
+        JcaPEMWriter pemWriter = new JcaPEMWriter(stringWriter);
+        OutputEncryptor encryptor = null;
+        if (password != null && password.length() > 0) {
+            encryptor =
+                    new JceOpenSSLPKCS8EncryptorBuilder(PKCSObjectIdentifiers.pbeWithSHAAnd3_KeyTripleDES_CBC)
+                            .setProvider(BouncyCastleProvider.PROVIDER_NAME).setRandom(PRNG)
+                            .setPasssword(password.toCharArray()).build();
+        }
+        pemWriter.writeObject(new JcaPKCS8Generator(key, encryptor));
+        pemWriter.close();
+        return stringWriter.toString();
+    }
+
+    /**
+     * PEM-encodes the given X509 certificate (compatible with OpenSSL) and returns the result as a
+     * String.
+     * @param cert the certificate.
+     * @return a String containing the PEM encoding of the certificate.
+     * @throws IOException if converting the certificate to PEM format fails.
+     */
+    public static String pemEncodeX509Certificate(X509Certificate cert) throws IOException {
+        StringWriter stringWriter = new StringWriter();
+        JcaPEMWriter pemWriter = new JcaPEMWriter(stringWriter);
+        pemWriter.writeObject(cert);
+        pemWriter.close();
+        return stringWriter.toString();
+    }
+
+    /**
+     * Encodes the given X509Certificate as a JKS TrustStore, optionally protecting the cert with a
+     * password (though it's unclear why one would do this since certificates only contain public
+     * information and do not need to be kept secret). Returns the byte array encoding of the trust
+     * store, which may be written to a file and loaded to instantiate the trust store at a later
+     * point or in another process.
+     * @param cert        the certificate to serialize.
+     * @param keyPassword an optional password to encrypt the trust store. If empty or null, the cert
+     *                    will not be encrypted.
+     * @return the serialized bytes of the JKS trust store.
+     */
+    public static byte[] certToJavaTrustStoreBytes(X509Certificate cert, String keyPassword)
+            throws IOException, GeneralSecurityException {
+        KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+        return certToTrustStoreBytes(cert, keyPassword, trustStore);
+    }
+
+    /**
+     * Encodes the given X509Certificate as a PKCS12 TrustStore, optionally protecting the cert with a
+     * password (though it's unclear why one would do this since certificates only contain public
+     * information and do not need to be kept secret). Returns the byte array encoding of the trust
+     * store, which may be written to a file and loaded to instantiate the trust store at a later
+     * point or in another process.
+     * @param cert        the certificate to serialize.
+     * @param keyPassword an optional password to encrypt the trust store. If empty or null, the cert
+     *                    will not be encrypted.
+     * @return the serialized bytes of the PKCS12 trust store.
+     */
+    public static byte[] certToPKCS12TrustStoreBytes(X509Certificate cert, String keyPassword)
+            throws IOException, GeneralSecurityException {
+        KeyStore trustStore = KeyStore.getInstance("PKCS12");
+        return certToTrustStoreBytes(cert, keyPassword, trustStore);
+    }
+
+
+    /**
+     * Encodes the given X509Certificate as a BCFKS TrustStore, optionally protecting the cert with a
+     * password (though it's unclear why one would do this since certificates only contain public
+     * information and do not need to be kept secret). Returns the byte array encoding of the trust
+     * store, which may be written to a file and loaded to instantiate the trust store at a later
+     * point or in another process.
+     * @param cert        the certificate to serialize.
+     * @param keyPassword an optional password to encrypt the trust store. If empty or null, the cert
+     *                    will not be encrypted.
+     * @return the serialized bytes of the BCFKS trust store. nn
+     */
+    public static byte[] certToBCFKSTrustStoreBytes(X509Certificate cert, String keyPassword)
+            throws IOException, GeneralSecurityException {
+        KeyStore trustStore = KeyStore.getInstance("BCFKS");
+        return certToTrustStoreBytes(cert, keyPassword, trustStore);
+    }
+
+    private static byte[] certToTrustStoreBytes(X509Certificate cert, String keyPassword,
+                                                KeyStore trustStore) throws IOException, GeneralSecurityException {
+        char[] keyPasswordChars = keyPassword == null ? new char[0] : keyPassword.toCharArray();
+        trustStore.load(null, keyPasswordChars);
+        trustStore.setCertificateEntry(cert.getSubjectDN().toString(), cert);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        trustStore.store(outputStream, keyPasswordChars);
+        outputStream.flush();
+        byte[] result = outputStream.toByteArray();
+        outputStream.close();
+        return result;
+    }
+
+    /**
+     * Encodes the given X509Certificate and private key as a JKS KeyStore, optionally protecting the
+     * private key (and possibly the cert?) with a password. Returns the byte array encoding of the
+     * key store, which may be written to a file and loaded to instantiate the key store at a later
+     * point or in another process.
+     * @param cert        the X509 certificate to serialize.
+     * @param privateKey  the private key to serialize.
+     * @param keyPassword an optional key password. If empty or null, the private key will not be
+     *                    encrypted.
+     * @return the serialized bytes of the JKS key store.
+     */
+    public static byte[] certAndPrivateKeyToJavaKeyStoreBytes(X509Certificate cert,
+                                                              PrivateKey privateKey, String keyPassword) throws IOException, GeneralSecurityException {
+        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+        return certAndPrivateKeyToBytes(cert, privateKey, keyPassword, keyStore);
+    }
+
+    /**
+     * Encodes the given X509Certificate and private key as a PKCS12 KeyStore, optionally protecting
+     * the private key (and possibly the cert?) with a password. Returns the byte array encoding of
+     * the key store, which may be written to a file and loaded to instantiate the key store at a
+     * later point or in another process.
+     * @param cert        the X509 certificate to serialize.
+     * @param privateKey  the private key to serialize.
+     * @param keyPassword an optional key password. If empty or null, the private key will not be
+     *                    encrypted.
+     * @return the serialized bytes of the PKCS12 key store.
+     */
+    public static byte[] certAndPrivateKeyToPKCS12Bytes(X509Certificate cert, PrivateKey privateKey,
+                                                        String keyPassword) throws IOException, GeneralSecurityException {
+        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+        return certAndPrivateKeyToBytes(cert, privateKey, keyPassword, keyStore);
+    }
+
+
+    /**
+     * Encodes the given X509Certificate and private key as a BCFKS KeyStore, optionally protecting
+     * the private key (and possibly the cert?) with a password. Returns the byte array encoding of
+     * the key store, which may be written to a file and loaded to instantiate the key store at a
+     * later point or in another process.
+     * @param cert        the X509 certificate to serialize.
+     * @param privateKey  the private key to serialize.
+     * @param keyPassword an optional key password. If empty or null, the private key will not be
+     *                    encrypted.
+     * @return the serialized bytes of the BCFKS key store. nn
+     */
+    public static byte[] certAndPrivateKeyToBCFKSBytes(X509Certificate cert, PrivateKey privateKey,
+                                                       String keyPassword) throws IOException, GeneralSecurityException {
+        KeyStore keyStore = KeyStore.getInstance("BCFKS");
+        return certAndPrivateKeyToBytes(cert, privateKey, keyPassword, keyStore);
+    }
+
+    private static byte[] certAndPrivateKeyToBytes(X509Certificate cert, PrivateKey privateKey,
+                                                   String keyPassword, KeyStore keyStore) throws IOException, GeneralSecurityException {
+        char[] keyPasswordChars = keyPassword == null ? new char[0] : keyPassword.toCharArray();
+        keyStore.load(null, keyPasswordChars);
+        keyStore.setKeyEntry("key", privateKey, keyPasswordChars, new Certificate[] { cert });
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        keyStore.store(outputStream, keyPasswordChars);
+        outputStream.flush();
+        byte[] result = outputStream.toByteArray();
+        outputStream.close();
+        return result;
+    }
+
+    /**
+     * Convenience method to convert a bouncycastle X509CertificateHolder to a java X509Certificate.
+     * @param certHolder a bouncycastle X509CertificateHolder.
+     * @return a java X509Certificate
+     * @throws CertificateException if the conversion fails.
+     */
+    public static X509Certificate toX509Cert(X509CertificateHolder certHolder)
+            throws CertificateException {
+        return new JcaX509CertificateConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME)
+                .getCertificate(certHolder);
+    }
+
+    private X509TestHelpers() {
+        // empty
+    }
+}
diff --git a/pom.xml b/pom.xml
index 3b2a410..6f1aae0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,7 @@
         <google.findbugs.version>3.0.1</google.findbugs.version>
         <commons-pool2.version>2.4.2</commons-pool2.version>
         <commons-lang3.version>3.12.0</commons-lang3.version>
+        <bouncycastle.version>1.60</bouncycastle.version>
 
         <!-- Maven Plugin Versioning -->
         <maven-assembly-plugin-version>3.1.1</maven-assembly-plugin-version>
@@ -1070,7 +1071,7 @@
                 <artifactId>netty-transport-native-unix-common</artifactId>
                 <version>${netty4.version}</version>
             </dependency>
-            
+
             <!-- set version for snakeyaml transitive dependencies -->
             <dependency>
                 <groupId>org.yaml</groupId>
@@ -1138,6 +1139,20 @@
                 <scope>test</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.bouncycastle</groupId>
+                <artifactId>bcpkix-jdk15on</artifactId>
+                <version>${bouncycastle.version}</version>
+                <scope>test</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>org.bouncycastle</groupId>
+                <artifactId>bcprov-jdk15on</artifactId>
+                <version>${bouncycastle.version}</version>
+                <scope>test</scope>
+            </dependency>
+
         </dependencies>
 
     </dependencyManagement>
diff --git a/transaction-client/pom.xml b/transaction-client/pom.xml
index 6c82ac4..13fc668 100644
--- a/transaction-client/pom.xml
+++ b/transaction-client/pom.xml
@@ -35,6 +35,7 @@
             <artifactId>omid-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.omid</groupId>
             <artifactId>omid-statemachine</artifactId>
@@ -106,6 +107,11 @@
         <!-- end logging -->
 
         <!-- testing -->
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- log4j2 test dependencies -->
         <dependency>
@@ -135,6 +141,25 @@
             <artifactId>testng</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.omid</groupId>
+            <artifactId>omid-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
         <!-- end testing -->
 
     </dependencies>
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
index 6bc6481..d68a63f 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
@@ -20,6 +20,10 @@
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 import org.apache.omid.YAMLUtils;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.omid.tls.X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS;
+import static org.apache.omid.tls.X509Util.DEFAULT_PROTOCOL;
 
 /**
  * Configuration for Omid client side
@@ -55,6 +59,32 @@
     private PostCommitMode postCommitMode = PostCommitMode.SYNC;
     private ConflictDetectionLevel conflictAnalysisLevel = ConflictDetectionLevel.CELL;
 
+    private boolean tlsEnabled = false;
+
+    private int clientNettyTlsHandshakeTimeout = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS;
+
+    private String keyStoreLocation = "";
+
+    private String keyStorePassword = "";
+
+    private String keyStoreType = "";
+
+    private String trustStoreLocation = "";
+
+    private String trustStorePassword = "";
+
+    private String trustStoreType = "";
+
+    private boolean sslCrlEnabled = false;
+
+    private boolean sslOcspEnabled = false;
+
+    private String enabledProtocols;
+
+    private String cipherSuites;
+
+    private String tlsConfigProtocols = DEFAULT_PROTOCOL;
+
     // ----------------------------------------------------------------------------------------------------------------
     // Instantiation
     // ----------------------------------------------------------------------------------------------------------------
@@ -63,6 +93,11 @@
         new YAMLUtils().loadSettings(DEFAULT_CONFIG_FILE_NAME, this);
     }
 
+    @VisibleForTesting
+    public OmidClientConfiguration(String configFileName) {
+        new YAMLUtils().loadSettings(configFileName, DEFAULT_CONFIG_FILE_NAME, this);
+    }
+
     // ----------------------------------------------------------------------------------------------------------------
     // Getters and setters for config params
     // ----------------------------------------------------------------------------------------------------------------
@@ -186,4 +221,138 @@
     public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
         this.conflictAnalysisLevel = conflictAnalysisLevel;
     }
+
+    public boolean getTlsEnabled() {
+        return tlsEnabled;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.tlsEnabled")
+    public void setTlsEnabled(boolean tlsEnabled) {
+        this.tlsEnabled = tlsEnabled;
+    }
+
+    public int getClientNettyTlsHandshakeTimeout() {
+        return clientNettyTlsHandshakeTimeout;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.clientNettyTlsHandshakeTimeout")
+    public void setClientNettyTlsHandshakeTimeout(int clientNettyTlsHandshakeTimeout) {
+        this.clientNettyTlsHandshakeTimeout = clientNettyTlsHandshakeTimeout;
+    }
+
+    public String getKeyStoreLocation() {
+        return keyStoreLocation;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.keyStoreLocation")
+    public void setKeyStoreLocation(String keyStoreLocation) {
+        this.keyStoreLocation = keyStoreLocation;
+    }
+
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.keyStorePassword")
+    public void setKeyStorePassword(String keyStorePassword) {
+        this.keyStorePassword = keyStorePassword;
+    }
+
+    public String getKeyStoreType() {
+        return keyStoreType;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.keyStoreType")
+
+    public void setKeyStoreType(String keyStoreType) {
+        this.keyStoreType = keyStoreType;
+    }
+
+    public String getTrustStoreLocation() {
+        return trustStoreLocation;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.trustStoreLocation")
+    public void setTrustStoreLocation(String trustStoreLocation) {
+        this.trustStoreLocation = trustStoreLocation;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.trustStorePassword")
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.trustStorePassword = trustStorePassword;
+    }
+
+    public String getTrustStoreType() {
+        return trustStoreType;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.trustStoreType")
+    public void setTrustStoreType(String trustStoreType) {
+        this.trustStoreType = trustStoreType;
+    }
+
+    public boolean getSslCrlEnabled() {
+        return sslCrlEnabled;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.sslCrlEnabled")
+    public void setSslCrlEnabled(boolean sslCrlEnabled) {
+        this.sslCrlEnabled = sslCrlEnabled;
+    }
+
+    public boolean getSslOcspEnabled() {
+        return sslOcspEnabled;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.sslCrlEnabled")
+    public void setSslOcspEnabled(boolean sslOcspEnabled) {
+        this.sslOcspEnabled = sslOcspEnabled;
+    }
+
+    public String getEnabledProtocols() {
+        return enabledProtocols;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.enabledProtocols")
+    public void setEnabledProtocols(String enabledProtocols) {
+        this.enabledProtocols = enabledProtocols;
+    }
+
+    public String getCipherSuites() {
+        return cipherSuites;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.cipherSuites")
+    public void setCipherSuites(String cipherSuites) {
+        this.cipherSuites = cipherSuites;
+    }
+
+    public String getTsConfigProtocols() {
+        return tlsConfigProtocols;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.client.tlsConfigProtocols")
+    public void setTsConfigProtocols(String tlsConfigProtocols) {
+        this.tlsConfigProtocols = tlsConfigProtocols;
+    }
+
+
+
 }
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 172d5eb..1d82508 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -47,9 +47,15 @@
 import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.omid.tls.X509Util;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.omid.tls.X509Exception;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +73,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 
 /**
@@ -104,6 +111,8 @@
     // Conflict detection level of the entire system. Can either be Row or Cell level.
     private ConflictDetectionLevel conflictDetectionLevel;
 
+    private final AtomicReference<io.netty.handler.ssl.SslContext> sslContextForClient = new AtomicReference<>();
+
 
     // ----------------------------------------------------------------------------------------------------------------
     // Construction
@@ -162,11 +171,20 @@
             @Override
             public void initChannel(SocketChannel channel) throws Exception {
                 ChannelPipeline pipeline = channel.pipeline();
+                if (omidConf.getTlsEnabled()){
+                    SslContext sslContext = getSslContext(omidConf);
+                    SslHandler sslHandler = sslContext.newHandler(channel.alloc(), hp.getHost(), hp.getPort());
+                    sslHandler.setHandshakeTimeoutMillis(omidConf.getClientNettyTlsHandshakeTimeout());
+                    channel.pipeline().addFirst(sslHandler);
+                    LOG.info("SSL handler added with handshake timeout {} ms",
+                            sslHandler.getHandshakeTimeoutMillis());
+                }
                 pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
                 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
                 pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
                 pipeline.addLast("protobufencoder", new ProtobufEncoder());
                 pipeline.addLast("inboundHandler", new Handler(fsm));
+
             }
         });
         bootstrap.option(ChannelOption.TCP_NODELAY, true);
@@ -179,6 +197,38 @@
 
     }
 
+    @VisibleForTesting
+    SslContext getSslContext(OmidClientConfiguration omidConf) throws X509Exception, IOException {
+        SslContext result = sslContextForClient.get();
+        if (result == null) {
+
+            String keyStoreLocation = omidConf.getKeyStoreLocation();
+            char[] keyStorePassword = omidConf.getKeyStorePassword().toCharArray();
+            String keyStoreType = omidConf.getKeyStoreType();
+
+            String trustStoreLocation = omidConf.getTrustStoreLocation();
+            char[] truststorePassword = omidConf.getTrustStorePassword().toCharArray();
+            String truststoreType = omidConf.getTrustStoreType();
+
+            boolean sslCrlEnabled = omidConf.getSslCrlEnabled();
+            boolean sslOcspEnabled = omidConf.getSslOcspEnabled();
+
+            String enabledProtocols = omidConf.getEnabledProtocols();
+            String cipherSuites =  omidConf.getCipherSuites();
+
+            String tlsConfigProtocols = omidConf.getTsConfigProtocols();
+
+            result = X509Util.createSslContextForClient(keyStoreLocation, keyStorePassword,
+                    keyStoreType, trustStoreLocation, truststorePassword, truststoreType, sslCrlEnabled,
+                    sslOcspEnabled, enabledProtocols, cipherSuites, tlsConfigProtocols);
+            if (!sslContextForClient.compareAndSet(null, result)) {
+                // lost the race, another thread already set the value
+                result = sslContextForClient.get();
+            }
+        }
+        return result;
+    }
+
     // ----------------------------------------------------------------------------------------------------------------
     // TSOProtocol interface
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/transaction-client/src/main/resources/omid-client-config.yml b/transaction-client/src/main/resources/omid-client-config.yml
index 478bd48..1cc3ee9 100644
--- a/transaction-client/src/main/resources/omid-client-config.yml
+++ b/transaction-client/src/main/resources/omid-client-config.yml
@@ -41,3 +41,9 @@
 # Conflict analysis level
 # Can either be cell level or row level. Default is cell level
 conflictDetectionLevel: !!org.apache.omid.tso.client.OmidClientConfiguration$ConflictDetectionLevel CELL
+
+
+# ---------------------------------------------------------------------------------------------------------------------
+#  TLS parameters
+# ---------------------------------------------------------------------------------------------------------------------
+tlsEnabled: false
diff --git a/transaction-client/src/test/java/org/apache/omid/tso/client/TestGetSslContext.java b/transaction-client/src/test/java/org/apache/omid/tso/client/TestGetSslContext.java
new file mode 100644
index 0000000..24de8da
--- /dev/null
+++ b/transaction-client/src/test/java/org/apache/omid/tso/client/TestGetSslContext.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.ssl.SslContext;
+import org.apache.omid.tls.X509KeyType;
+import org.apache.omid.tls.X509TestContext;
+import org.apache.omid.tls.X509Util;
+import org.apache.omid.tls.KeyStoreFileType;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.security.Security;
+
+import static org.mockito.Mockito.mock;
+
+public class TestGetSslContext {
+    private static final String CURRENT_TSO_PATH = "/current_tso_path";
+
+    @Test(timeOut = 10_000)
+    public void testYamlReading() throws Exception {
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + 1234);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(true);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        X509TestContext x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+
+        tsoClientConf.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoClientConf.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoClientConf.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+        String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA";
+        tsoClientConf.setCipherSuites(cipherSuite);
+
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+        SslContext sslContext = tsoClient.getSslContext(tsoClientConf);
+
+        ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+        Assert.assertEquals(new String[] { X509Util.DEFAULT_PROTOCOL },
+                sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols());
+
+        Assert.assertEquals(new String[] { cipherSuite },
+                sslContext.newEngine(byteBufAllocatorMock).getEnabledCipherSuites());
+
+        Assert.assertFalse(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
+        Assert.assertFalse(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
+        Assert.assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
+
+    }
+
+}
\ No newline at end of file
diff --git a/transaction-client/src/test/java/org/apache/omid/tso/client/TestOmidClientConfiguration.java b/transaction-client/src/test/java/org/apache/omid/tso/client/TestOmidClientConfiguration.java
index c8ac7a6..6fa83e8 100644
--- a/transaction-client/src/test/java/org/apache/omid/tso/client/TestOmidClientConfiguration.java
+++ b/transaction-client/src/test/java/org/apache/omid/tso/client/TestOmidClientConfiguration.java
@@ -27,6 +27,33 @@
         OmidClientConfiguration configuration = new OmidClientConfiguration();
         Assert.assertNotNull(configuration.getConnectionString());
         Assert.assertNotNull(configuration.getConnectionType());
+        Assert.assertEquals(configuration.getEnabledProtocols(), null);
+        Assert.assertEquals(configuration.getTsConfigProtocols(), "TLSv1.2");
+        Assert.assertEquals(configuration.getTlsEnabled(), false);
+        Assert.assertEquals(configuration.getKeyStoreLocation(), "");
+        Assert.assertEquals(configuration.getKeyStorePassword(), "");
+        Assert.assertEquals(configuration.getTrustStoreLocation(), "");
+        Assert.assertEquals(configuration.getTrustStorePassword(), "");
+        Assert.assertEquals(configuration.getKeyStoreType(), "");
+        Assert.assertEquals(configuration.getKeyStoreType(), "");
+    }
+
+    @Test(timeOut = 10_000)
+    public void testCustomYamlReading() {
+        OmidClientConfiguration configuration = new OmidClientConfiguration("tlstest-omid-client-config.yml");
+        Assert.assertNotNull(configuration.getConnectionString());
+        Assert.assertNotNull(configuration.getConnectionType());
+        Assert.assertEquals(configuration.getEnabledProtocols(), "TLSv1.2");
+        Assert.assertEquals(configuration.getTsConfigProtocols(), "TLSv1.2");
+        Assert.assertEquals(configuration.getTlsEnabled(), true);
+        Assert.assertEquals(configuration.getKeyStoreLocation(), "/asd");
+        Assert.assertEquals(configuration.getKeyStorePassword(), "pass");
+
+        Assert.assertEquals(configuration.getKeyStoreType(), "JKS");
+        Assert.assertEquals(configuration.getTrustStoreLocation(), "/tasd");
+        Assert.assertEquals(configuration.getTrustStorePassword(), "tpas$w0rd1");
+        Assert.assertEquals(configuration.getKeyStoreType(), "JKS");
+        Assert.assertEquals(configuration.getCipherSuites(), "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA");
     }
 
 }
\ No newline at end of file
diff --git a/transaction-client/src/test/resources/tlstest-omid-client-config.yml b/transaction-client/src/test/resources/tlstest-omid-client-config.yml
new file mode 100644
index 0000000..88ca124
--- /dev/null
+++ b/transaction-client/src/test/resources/tlstest-omid-client-config.yml
@@ -0,0 +1,60 @@
+# =====================================================================================================================
+# Omid Client Configuration
+# =====================================================================================================================
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Basic connection parameters to TSO Server
+# ---------------------------------------------------------------------------------------------------------------------
+
+# Direct connection to host:port
+connectionType: !!org.apache.omid.tso.client.OmidClientConfiguration$ConnType DIRECT
+connectionString: "localhost:54758"
+
+# When Omid is working in High Availability mode, two or more replicas of the TSO server are running in primary/backup
+# mode. When a TSO server replica is elected as master, it publishes its address through ZK. In order to configure
+# the Omid client to access the TSO server in HA mode:
+#     1) set 'connectionType' to !!org.apache.omid.tso.client.OmidClientConfiguration$ConnType HA
+#     2) set 'connectionString' to the ZK cluster connection string where the server is publishing its address
+zkConnectionTimeoutInSecs: 10
+# In HA mode, make sure that the next settings match same settings on the TSO server side
+zkNamespace: "omid"
+zkCurrentTsoPath: "/current-tso"
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Communication protocol parameters
+# ---------------------------------------------------------------------------------------------------------------------
+# TODO: describe these parameters
+requestMaxRetries: 5
+requestTimeoutInMs: 5000
+reconnectionDelayInSecs: 10
+retryDelayInMs: 1000
+executorThreads: 3
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Transaction Manager parameters
+# ---------------------------------------------------------------------------------------------------------------------
+
+# Configure whether the TM performs the post-commit actions for a tx (update shadow cells and clean commit table entry)
+# before returning to the control to the client (SYNC) or in parallel (ASYNC)
+postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
+
+# Conflict analysis level
+# Can either be cell level or row level. Default is cell level
+conflictDetectionLevel: !!org.apache.omid.tso.client.OmidClientConfiguration$ConflictDetectionLevel CELL
+
+# ---------------------------------------------------------------------------------------------------------------------
+#  TLS parameters
+# ---------------------------------------------------------------------------------------------------------------------
+tlsEnabled: true
+keyStoreLocation: "/asd"
+keyStorePassword: "pass"
+keyStoreType: "JKS"
+enabledProtocols: "TLSv1.2"
+trustStoreLocation: "/tasd"
+trustStorePassword: "tpas$w0rd1"
+trustStoreType: "JKS"
+cipherSuites: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"
+
+# sslCrlEnabled: true
+# sslOcspEnabled: true
+# tlsConfigProtocols: "TLSv1.2"
\ No newline at end of file
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index aa31375..c00ddd0 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -45,6 +45,25 @@
 
         <dependency>
             <groupId>org.apache.omid</groupId>
+            <artifactId>omid-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.omid</groupId>
             <artifactId>omid-hbase-commit-table</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index 815ce55..7e38d5a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -49,8 +49,13 @@
 import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.ssl.OptionalSslHandler;
+import io.netty.handler.ssl.SslContext;
 import io.netty.util.AttributeKey;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.omid.tls.X509Util;
+import org.apache.omid.tls.X509Exception;
+
 
 /**
  * ChannelHandler for the TSO Server.
@@ -103,6 +108,10 @@
                 // Max packet length is 10MB. Transactions with so many cells
                 // that the packet is rejected will receive a ServiceUnavailableException.
                 // 10MB is enough for 2 million cells in a transaction though.
+                if (config.getTlsEnabled())
+                {
+                    initSSL(pipeline, config.getSupportPlainText());
+                }
                 pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
                 pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
                 pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
@@ -112,6 +121,39 @@
         });
     }
 
+    private void initSSL(ChannelPipeline p, boolean supportPlaintext)
+            throws X509Exception, IOException {
+        String keyStoreLocation = config.getKeyStoreLocation();
+        char[] keyStorePassword = config.getKeyStorePassword().toCharArray();
+        String keyStoreType = config.getKeyStoreType();
+
+        String trustStoreLocation = config.getTrustStoreLocation();
+        char[] truststorePassword = config.getTrustStorePassword().toCharArray();
+        String truststoreType = config.getTrustStoreType();
+
+        boolean sslCrlEnabled = config.getSslCrlEnabled();
+        boolean sslOcspEnabled = config.getSslOcspEnabled();
+
+        String enabledProtocols = config.getEnabledProtocols();
+        String cipherSuites =  config.getCipherSuites();
+
+        String tlsConfigProtocols = config.getTsConfigProtocols();
+
+        SslContext nettySslContext = X509Util.createSslContextForServer(keyStoreLocation, keyStorePassword,
+                keyStoreType, trustStoreLocation, truststorePassword, truststoreType, sslCrlEnabled,
+                sslOcspEnabled, enabledProtocols, cipherSuites, tlsConfigProtocols);
+
+
+        if (supportPlaintext) {
+            p.addLast("ssl", new OptionalSslHandler(nettySslContext));
+            LOG.info("Dual mode SSL handler added for channel: {}", p.channel());
+        } else {
+            p.addLast("ssl", nettySslContext.newHandler(p.channel().alloc()));
+            LOG.info("SSL handler added for channel: {}", p.channel());
+        }
+    }
+
+
     /**
      * Allows to create and connect the communication channel closing the previous one if existed
      */
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 8907945..53d8c01 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -27,6 +27,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.omid.tls.X509Util.DEFAULT_PROTOCOL;
+
 /**
  * Reads the configuration parameters of a TSO server instance from CONFIG_FILE_NAME.
  * If file CONFIG_FILE_NAME is missing defaults to DEFAULT_CONFIG_FILE_NAME
@@ -93,6 +95,31 @@
 
     public boolean monitorContext;
 
+    private boolean tlsEnabled = false;
+    private boolean supportPlainText = true;
+
+    private String keyStoreLocation = "";
+
+    private String keyStorePassword = "";
+
+    private String keyStoreType = "";
+
+    private String trustStoreLocation = "";
+
+    private String trustStorePassword = "";
+
+    private String trustStoreType = "";
+
+    private boolean sslCrlEnabled = false;
+
+    private boolean sslOcspEnabled = false;
+
+    private String enabledProtocols;
+
+    private String cipherSuites;
+
+    private String tlsConfigProtocols = DEFAULT_PROTOCOL;
+
     public boolean getMonitorContext() {
         return monitorContext;
     }
@@ -161,6 +188,58 @@
         return timestampType;
     }
 
+    public boolean getTlsEnabled() {
+        return tlsEnabled;
+    }
+
+    public boolean getSupportPlainText() {
+        return supportPlainText;
+    }
+
+    public String getKeyStoreLocation() {
+        return keyStoreLocation;
+    }
+
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+
+    public String getKeyStoreType() {
+        return keyStoreType;
+    }
+
+    public String getTrustStoreLocation() {
+        return trustStoreLocation;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    public String getTrustStoreType() {
+        return trustStoreType;
+    }
+
+    public boolean getSslCrlEnabled() {
+        return sslCrlEnabled;
+    }
+
+    public boolean getSslOcspEnabled() {
+        return sslOcspEnabled;
+    }
+
+    public String getEnabledProtocols() {
+        return enabledProtocols;
+    }
+
+    public String getCipherSuites() {
+        return cipherSuites;
+    }
+
+    public String getTsConfigProtocols() {
+        return tlsConfigProtocols;
+    }
+
     public void setTimestampType(String type) {
         this.timestampType = type;
     }
@@ -212,4 +291,56 @@
     public void setWaitStrategy(String waitStrategy) {
         this.waitStrategy = waitStrategy;
     }
+
+    public void setTlsEnabled(boolean tlsEnabled) {
+        this.tlsEnabled = tlsEnabled;
+    }
+
+    public void setSupportPlainText(boolean supportPlainText) {
+        this.supportPlainText = supportPlainText;
+    }
+
+    public void setKeyStoreLocation(String keyStoreLocation) {
+        this.keyStoreLocation = keyStoreLocation;
+    }
+
+    public void setKeyStorePassword(String keyStorePassword) {
+        this.keyStorePassword = keyStorePassword;
+    }
+
+    public void setKeyStoreType(String keyStoreType) {
+        this.keyStoreType = keyStoreType;
+    }
+
+    public void setTrustStoreLocation(String trustStoreLocation) {
+        this.trustStoreLocation = trustStoreLocation;
+    }
+
+    public void setTrustStorePassword(String trustStorePassword) {
+        this.trustStorePassword = trustStorePassword;
+    }
+
+    public void setTrustStoreType(String trustStoreType) {
+        this.trustStoreType = trustStoreType;
+    }
+
+    public void setSslCrlEnabled(boolean sslCrlEnabled) {
+        this.sslCrlEnabled = sslCrlEnabled;
+    }
+
+    public void setSslOcspEnabled(boolean sslOcspEnabled) {
+        this.sslOcspEnabled = sslOcspEnabled;
+    }
+
+    public void setEnabledProtocols(String enabledProtocols) {
+        this.enabledProtocols = enabledProtocols;
+    }
+
+    public void setCipherSuites(String cipherSuites) {
+        this.cipherSuites = cipherSuites;
+    }
+
+    public void setTsConfigProtocols(String tlsConfigProtocols) {
+        this.tlsConfigProtocols = tlsConfigProtocols;
+    }
 }
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index b4bd203..ca25c9c 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -55,6 +55,11 @@
 
 monitorContext: false
 
+#  TLS parameters
+tlsEnabled: false
+
+supportPlainText: true
+
 # ---------------------------------------------------------------------------------------------------------------------
 # Timestamp storage configuration options
 # ---------------------------------------------------------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOServerConfigTest.java b/tso-server/src/test/java/org/apache/omid/tso/TSOServerConfigTest.java
index 0b1fe00..4359d9b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOServerConfigTest.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOServerConfigTest.java
@@ -17,13 +17,44 @@
  */
 package org.apache.omid.tso;
 
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TSOServerConfigTest {
 
     @Test(timeOut = 10_000)
     public void testParsesOK() throws Exception {
-        new TSOServerConfig("test-omid.yml");
+        TSOServerConfig tsoServerConfig = new TSOServerConfig("test-omid.yml");
+        Assert.assertEquals(tsoServerConfig.getTlsEnabled(), false);
+        Assert.assertEquals(tsoServerConfig.getSupportPlainText(), true);
+        Assert.assertEquals(tsoServerConfig.getEnabledProtocols(), null);
+        Assert.assertEquals(tsoServerConfig.getTsConfigProtocols(), "TLSv1.2");
+        Assert.assertEquals(tsoServerConfig.getKeyStoreLocation(), "");
+        Assert.assertEquals(tsoServerConfig.getKeyStorePassword(), "");
+        Assert.assertEquals(tsoServerConfig.getKeyStoreType(), "");
+        Assert.assertEquals(tsoServerConfig.getTrustStoreLocation(), "");
+        Assert.assertEquals(tsoServerConfig.getTrustStorePassword(), "");
+        Assert.assertEquals(tsoServerConfig.getKeyStoreType(), "");
     }
 
+    @Test(timeOut = 10_000)
+    public void testCustomParseK() throws Exception {
+        TSOServerConfig tsoServerConfig = new TSOServerConfig("tlstest-omid-server-config.yml");
+        Assert.assertEquals(tsoServerConfig.getCipherSuites(), "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA");
+        Assert.assertEquals(tsoServerConfig.getTlsEnabled(), true);
+        Assert.assertEquals(tsoServerConfig.getSupportPlainText(), false);
+
+        Assert.assertEquals(tsoServerConfig.getEnabledProtocols(), "TLSv1.2");
+        Assert.assertEquals(tsoServerConfig.getTsConfigProtocols(), "TLSv1.2");
+
+        Assert.assertEquals(tsoServerConfig.getKeyStoreLocation(), "/asd");
+        Assert.assertEquals(tsoServerConfig.getKeyStorePassword(), "pass");
+        Assert.assertEquals(tsoServerConfig.getKeyStoreType(), "JKS");
+        Assert.assertEquals(tsoServerConfig.getTrustStoreLocation(), "/tasd");
+        Assert.assertEquals(tsoServerConfig.getTrustStorePassword(), "tpas$w0rd1");
+        Assert.assertEquals(tsoServerConfig.getKeyStoreType(), "JKS");
+
+    }
+
+
 }
\ No newline at end of file
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
index fa4f4ee..776fd7a 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java
@@ -25,6 +25,8 @@
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.omid.TestUtils;
 import org.apache.omid.committable.CommitTable;
+import org.apache.omid.tls.X509KeyType;
+import org.apache.omid.tls.X509TestContext;
 import org.apache.omid.tso.HALeaseManagementModule;
 import org.apache.omid.tso.TSOMockModule;
 import org.apache.omid.tso.TSOServer;
@@ -33,6 +35,8 @@
 import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
 import org.apache.statemachine.StateMachine.FsmImpl;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.omid.tls.KeyStoreFileType;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +44,9 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.File;
+import java.nio.file.Files;
+import java.security.Security;
 import java.util.concurrent.ExecutionException;
 
 import static org.testng.Assert.assertEquals;
@@ -67,6 +74,8 @@
     private CuratorFramework zkClient;
     private TSOServer tsoServer;
 
+    protected X509TestContext x509TestContext;
+
     @BeforeMethod
     public void beforeMethod() throws Exception {
 
@@ -155,6 +164,425 @@
     }
 
     @Test(timeOut = 30_000)
+    public void testSuccessfulConnectionToTSOWithHostAndPortSSLServerside() throws Exception {
+
+        // Launch a TSO WITHOUT publishing the address in HA...
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(tsoPortForTest);
+        tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
+        tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+        tsoConfig.setTlsEnabled(true);
+        tsoConfig.setSupportPlainText(true);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+
+        LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
+
+
+        tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoConfig.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoConfig.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoConfig.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+
+        injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAsync();
+        tsoServer.awaitRunning();
+        TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
+        LOG.info("Finished loading TSO");
+
+        // When no HA node for TSOServer is found we should get a connection
+        // to the TSO through the host:port configured...
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(false);
+        tsoClientConf.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoClientConf.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoClientConf.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+        // ... so we should get responses from the methods
+        Long startTS = tsoClient.getNewStartTimestamp().get();
+        LOG.info("Start TS {} ", startTS);
+        assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
+
+        // Close the tsoClient connection and stop the TSO Server
+        tsoClient.close().get();
+        tsoServer.stopAsync();
+        tsoServer.awaitTerminated();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+        LOG.info("TSO Server Stopped");
+    }
+
+
+    @Test(timeOut = 30_000)
+    public void testSuccessfulConnectionToTSOWithHostAndPortSSLBoth() throws Exception {
+
+        // Launch a TSO WITHOUT publishing the address in HA...
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(tsoPortForTest);
+        tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
+        tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+        tsoConfig.setTlsEnabled(true);
+        tsoConfig.setSupportPlainText(false);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+
+        LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
+
+
+        tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoConfig.setSslCrlEnabled(true);
+
+        injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAsync();
+        tsoServer.awaitRunning();
+        TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
+        LOG.info("Finished loading TSO");
+
+        // When no HA node for TSOServer is found we should get a connection
+        // to the TSO through the host:port configured...
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(true);
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+        // ... so we should get responses from the methods
+        Long startTS = tsoClient.getNewStartTimestamp().get();
+        LOG.info("Start TS {} ", startTS);
+        assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
+
+        // Close the tsoClient connection and stop the TSO Server
+        tsoClient.close().get();
+        tsoServer.stopAsync();
+        tsoServer.awaitTerminated();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+        LOG.info("TSO Server Stopped");
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSuccessfulConnectionToTSOWithHostAndPortBCFKS() throws Exception {
+
+        // Launch a TSO WITHOUT publishing the address in HA...
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(tsoPortForTest);
+        tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
+        tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+        tsoConfig.setTlsEnabled(true);
+        tsoConfig.setSupportPlainText(false);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.BCFKS, KeyStoreFileType.BCFKS);
+
+        LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
+
+
+        tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoConfig.setSslCrlEnabled(true);
+
+        injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAsync();
+        tsoServer.awaitRunning();
+        TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
+        LOG.info("Finished loading TSO");
+
+        // When no HA node for TSOServer is found we should get a connection
+        // to the TSO through the host:port configured...
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(true);
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+        // ... so we should get responses from the methods
+        Long startTS = tsoClient.getNewStartTimestamp().get();
+        LOG.info("Start TS {} ", startTS);
+        assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
+
+        // Close the tsoClient connection and stop the TSO Server
+        tsoClient.close().get();
+        tsoServer.stopAsync();
+        tsoServer.awaitTerminated();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+        LOG.info("TSO Server Stopped");
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSuccessfulConnectionToTSOWithHostAndPortSSLBothMutual() throws Exception {
+
+        // Launch a TSO WITHOUT publishing the address in HA...
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(tsoPortForTest);
+        tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
+        tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+        tsoConfig.setTlsEnabled(true);
+        tsoConfig.setSupportPlainText(false);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+
+        LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
+
+
+        tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoConfig.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoConfig.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoConfig.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+        tsoConfig.setSslCrlEnabled(true);
+
+
+        injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAsync();
+        tsoServer.awaitRunning();
+        TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
+        LOG.info("Finished loading TSO");
+
+        // When no HA node for TSOServer is found we should get a connection
+        // to the TSO through the host:port configured...
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(true);
+        tsoClientConf.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoClientConf.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoClientConf.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+        // ... so we should get responses from the methods
+        Long startTS = tsoClient.getNewStartTimestamp().get();
+        LOG.info("Start TS {} ", startTS);
+        assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
+
+        // Close the tsoClient connection and stop the TSO Server
+        tsoClient.close().get();
+        tsoServer.stopAsync();
+        tsoServer.awaitTerminated();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+        LOG.info("TSO Server Stopped");
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSuccessfulConnectionToTSOWithHostAndPortSSLCipherSuite() throws Exception {
+
+        // Launch a TSO WITHOUT publishing the address in HA...
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(tsoPortForTest);
+        tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
+        tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+        tsoConfig.setTlsEnabled(true);
+        tsoConfig.setSupportPlainText(false);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+
+        LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
+
+
+        tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+
+        tsoConfig.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384");
+
+
+        injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAsync();
+        tsoServer.awaitRunning();
+        TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
+        LOG.info("Finished loading TSO");
+
+        // When no HA node for TSOServer is found we should get a connection
+        // to the TSO through the host:port configured...
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(true);
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+        tsoClientConf.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384");
+
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+        // ... so we should get responses from the methods
+        Long startTS = tsoClient.getNewStartTimestamp().get();
+        LOG.info("Start TS {} ", startTS);
+        assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
+
+        // Close the tsoClient connection and stop the TSO Server
+        tsoClient.close().get();
+        tsoServer.stopAsync();
+        tsoServer.awaitTerminated();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+        LOG.info("TSO Server Stopped");
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testSuccessfulConnectionToTSOWithHostAndPortSSLOneCipherSuite2() throws Exception {
+
+        // Launch a TSO WITHOUT publishing the address in HA...
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(tsoPortForTest);
+        tsoConfig.setTimestampType(TIMESTAMP_TYPE.INCREMENTAL.toString());
+        tsoConfig.setLeaseModule(new VoidLeaseManagementModule());
+        tsoConfig.setTlsEnabled(true);
+        tsoConfig.setSupportPlainText(false);
+
+        Security.addProvider(new BouncyCastleProvider());
+        File tempDir = Files.createTempDirectory("x509Tests").toFile();
+        String keyPassword = "pa$$w0rd";
+        X509KeyType certKeyType = X509KeyType.RSA;
+        X509KeyType caKeyType = X509KeyType.RSA;
+
+        x509TestContext = X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+                .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+                .setTrustStoreKeyType(caKeyType).build();
+
+        x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+
+        LOG.info("trustStoreLocation :{}", x509TestContext.getTlsConfigKeystoreLocation());
+
+
+        tsoConfig.setKeyStoreLocation(x509TestContext.getTlsConfigKeystoreLocation());;
+        tsoConfig.setKeyStorePassword(x509TestContext.getTlsConfigKeystorePassword());
+        tsoConfig.setKeyStoreType(x509TestContext.getTlsConfigKeystoreType());
+
+        tsoConfig.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA");
+
+
+        injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAsync();
+        tsoServer.awaitRunning();
+        TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 300);
+        LOG.info("Finished loading TSO");
+
+        // When no HA node for TSOServer is found we should get a connection
+        // to the TSO through the host:port configured...
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString("localhost:" + tsoPortForTest);
+        tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH);
+        tsoClientConf.setTlsEnabled(true);
+        tsoClientConf.setTrustStoreLocation(x509TestContext.getTlsConfigTrustLocation());
+        tsoClientConf.setTrustStorePassword(x509TestContext.getTlsConfigTrustPassword());
+        tsoClientConf.setTrustStoreType(x509TestContext.getTlsConfigTrustType());
+        tsoClientConf.setCipherSuites("TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA");
+
+        TSOClient tsoClient = TSOClient.newInstance(tsoClientConf);
+
+        // ... so we should get responses from the methods
+        Long startTS = tsoClient.getNewStartTimestamp().get();
+        LOG.info("Start TS {} ", startTS);
+        assertEquals(startTS.longValue(), CommitTable.MAX_CHECKPOINTS_PER_TXN);
+
+        // Close the tsoClient connection and stop the TSO Server
+        tsoClient.close().get();
+        tsoServer.stopAsync();
+        tsoServer.awaitTerminated();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000);
+        LOG.info("TSO Server Stopped");
+
+    }
+
+    @Test(timeOut = 30_000)
     public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
 
         // Launch a TSO publishing the address in HA...
diff --git a/tso-server/src/test/resources/tlstest-omid-server-config.yml b/tso-server/src/test/resources/tlstest-omid-server-config.yml
new file mode 100644
index 0000000..accba3a
--- /dev/null
+++ b/tso-server/src/test/resources/tlstest-omid-server-config.yml
@@ -0,0 +1,186 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# =====================================================================================================================
+# Omid TSO Server Configuration (Default parameters)
+# =====================================================================================================================
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Basic configuration parameters
+# ---------------------------------------------------------------------------------------------------------------------
+
+# Network interface for TSO server communication. Uncomment the following line to use a specific interface
+# networkIfaceName: eth0
+# If a network interface in the configuration, the TSO will attempt to guess default network interface.
+# See org.apache.omid.tso.TSOServerConfig.getDefaultNetworkInterface for more information.
+
+# Port reserved by the Status Oracle
+port: 54758
+# Wait strategy for the Disruptor processors in TSO pipeline. Options:
+# 1) HIGH_THROUGHPUT - [Default] Use this in production deployments for maximum performance
+# 2) LOW_CPU - Use this option when testing or in deployments where saving CPU cycles is more important than throughput
+waitStrategy: HIGH_THROUGHPUT
+# The number of elements reserved in the conflict map to perform conflict resolution
+conflictMapSize: 100000000
+# The number of Commit Table writers that persist data concurrently to the datastore. It has to be at least 2.
+numConcurrentCTWriters: 2
+# The size of the batch of operations that each Commit Table writes has. The maximum number of operations that can be
+# batched in the system at a certain point in time is: numConcurrentCTWriters * batchSizePerCTWriter
+batchSizePerCTWriter: 25
+# When this timeout expires, the contents of the batch are flushed to the datastore
+batchPersistTimeoutInMs: 10
+# Timestamp generation strategy
+# INCREMENTAL - regular counter
+# WORLD_TIME - [Default] world time based counter
+timestampType: WORLD_TIME
+lowLatency: false
+# Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
+timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
+commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]
+leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
+
+# Default stats/metrics configuration
+metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
+
+monitorContext: false
+
+#  TLS parameters
+tlsEnabled: true
+
+supportPlainText: false
+keyStoreLocation: "/asd"
+keyStorePassword: "pass"
+keyStoreType: "JKS"
+enabledProtocols: "TLSv1.2"
+trustStoreLocation: "/tasd"
+trustStorePassword: "tpas$w0rd1"
+trustStoreType: "JKS"
+cipherSuites: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"
+
+# sslCrlEnabled: true
+# sslOcspEnabled: true
+# tlsConfigProtocols: "TLSv1.2"
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Timestamp storage configuration options
+# ---------------------------------------------------------------------------------------------------------------------
+# Could be any guava module that binds org.apache.omid.timestamp.storage.TimestampStorage
+# Current available Timestamp stores:
+#     org.apache.omid.tso.InMemoryTimestampStorageModule
+#     org.apache.omid.timestamp.storage.HBaseTimestampStorageModule
+#     org.apache.omid.timestamp.storage.ZKTimestampStorageModule
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Commit Table storage configuration options
+# ---------------------------------------------------------------------------------------------------------------------
+# Could be any guava module that binds org.apache.omid.committable.CommitTable
+# Available CommitTable stores:
+#     org.apache.omid.committable.hbase.HBaseCommitTableStorageModule
+#     org.apache.omid.tso.InMemoryCommitTableStorageModule
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Metrics configuration options
+# ---------------------------------------------------------------------------------------------------------------------
+# Metrics could be anything that is org.apache.omid.metrics.MetricsRegistry
+# There are 4 built-in reporters: CSV, SLF4J, GRAPHITE, CONSOLE
+# Please see org.apache.omid.metrics.CodahaleMetricsConfig for details.
+
+# Example configuration for reporting statistics to the console every minute:
+#
+# metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+#     !!org.apache.omid.metrics.CodahaleMetricsConfig {
+#         outputFreqInSecs: 60,
+#         reporters: !!set {
+#             !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CONSOLE
+#         },
+#     }
+# ]
+
+# Example of multiple reporter configuration (to CSV files and console)
+#
+# metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+#     !!org.apache.omid.metrics.CodahaleMetricsConfig {
+#         outputFreqInSecs: 60,
+#         reporters: !!set {
+#             !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV,
+#             !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CONSOLE
+#         },
+#         csvDir: "csvMetrics",
+#         prefix: "somePrefix",
+#     }
+# ]
+
+# =====================================================================================================================
+# Some example configurations
+# =====================================================================================================================
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Configuration WITHOUT High Availability using HBase for all required storage & reporting metrics to CSV files
+# ---------------------------------------------------------------------------------------------------------------------
+#
+# commitTableStoreModule: !!org.apache.omid.committable.hbase.DefaultHBaseCommitTableStorageModule [ ]
+#     See optional params
+#         - tableName
+#         - familyName
+#         - principal
+#         - keytab
+# timestampStoreModule: !!org.apache.omid.timestamp.storage.DefaultHBaseTimestampStorageModule [ ]
+#     See optional params
+#         - tableName
+#         - familyName
+#         - principal
+#         - keytab
+# leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
+# metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+#     !!org.apache.omid.metrics.CodahaleMetricsConfig {
+#         reporters: !!set {
+#             !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV
+#         },
+#         csvDir: "myCSVMetricsDir",
+#         prefix: "myAppPrefixForMetrics",
+#     }
+# ]
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Configuration WITHOUT High Availability using ZK to store the timestamps & reporting metrics to console every 30 secs
+# ---------------------------------------------------------------------------------------------------------------------
+#
+# commitTableStoreModule: !!org.apache.omid.committable.hbase.DefaultHBaseCommitTableStorageModule [ ]
+# timestampStoreModule: !!org.apache.omid.timestamp.storage.DefaultZKTimestampStorageModule
+#         zkCluster: "localhost:2181"
+#         namespace: "omid"
+# leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
+# metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+#     !!org.apache.omid.metrics.CodahaleMetricsConfig {
+#         outputFreqInSecs: 30,
+#         reporters: !!set {
+#             !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CONSOLE
+#         },
+#     }
+# ]
+
+
+# ---------------------------------------------------------------------------------------------------------------------
+# Configuration WITH High Availability using HBase for all required storage and no metrics reports
+# ---------------------------------------------------------------------------------------------------------------------
+#
+# commitTableStoreModule: !!org.apache.omid.committable.hbase.DefaultHBaseCommitTableStorageModule [ ]
+# timestampStoreModule: !!org.apache.omid.timestamp.storage.DefaultHBaseTimestampStorageModule [ ]
+# leaseModule: !!org.apache.omid.tso.HALeaseManagementModule
+#     leasePeriodInMs: 10000
+#     tsoLeasePath: "/tso-lease"
+#     currentTsoPath: "/current-tso"
+#     zkCluster: "localhost:2181"
+#     zkNamespace: "omid"
+# metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]