Support for persisting TracingConfiguration into Registry Center (#1771)

* Refactor TracingConfiguration

* Add testcases for refactored Tracing module

* Adjust API in Tracing module
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml b/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml
index d170be5..c63e52a 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/pom.xml
@@ -110,6 +110,10 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-inline</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/console/controller/CloudJobController.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/console/controller/CloudJobController.java
index d5548f9..eb39fbe 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/console/controller/CloudJobController.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/console/controller/CloudJobController.java
@@ -100,7 +100,7 @@
         CloudJobController.regCenter = regCenter;
         CloudJobController.producerManager = producerManager;
         Optional<TracingConfiguration<?>> tracingConfiguration = BootstrapEnvironment.getINSTANCE().getTracingConfiguration();
-        jobEventRdbSearch = tracingConfiguration.map(tracingConfiguration1 -> new JobEventRdbSearch((DataSource) tracingConfiguration1.getStorage())).orElse(null);
+        jobEventRdbSearch = tracingConfiguration.map(tracingConfiguration1 -> new JobEventRdbSearch((DataSource) tracingConfiguration1.getTracingStorageConfiguration().getStorage())).orElse(null);
     }
     
     /**
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/statistics/StatisticManager.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/statistics/StatisticManager.java
index 6cb0cb1..18d7086 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/statistics/StatisticManager.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/statistics/StatisticManager.java
@@ -104,7 +104,7 @@
     private static void init() {
         if (null != instance.tracingConfiguration) {
             try {
-                instance.rdbRepository = new StatisticRdbRepository((DataSource) instance.tracingConfiguration.getStorage());
+                instance.rdbRepository = new StatisticRdbRepository((DataSource) instance.tracingConfiguration.getTracingStorageConfiguration().getStorage());
             } catch (final SQLException ex) {
                 log.error("Init StatisticRdbRepository error:", ex);
             }
diff --git a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/env/BootstrapEnvironmentTest.java b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/env/BootstrapEnvironmentTest.java
index 5749e47..4a33c50 100755
--- a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/env/BootstrapEnvironmentTest.java
+++ b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/env/BootstrapEnvironmentTest.java
@@ -74,7 +74,7 @@
         properties.setProperty(BootstrapEnvironment.EnvironmentArgument.EVENT_TRACE_RDB_USERNAME.getKey(), "sa");
         properties.setProperty(BootstrapEnvironment.EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD.getKey(), "password");
         ReflectionUtils.setFieldValue(bootstrapEnvironment, "properties", properties);
-        bootstrapEnvironment.getTracingConfiguration().ifPresent(tracingConfig -> assertThat(tracingConfig.getStorage(), instanceOf(BasicDataSource.class)));
+        bootstrapEnvironment.getTracingConfiguration().ifPresent(tracingConfig -> assertThat(tracingConfig.getTracingStorageConfiguration().getStorage(), instanceOf(BasicDataSource.class)));
     }
     
     @Test
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingConfiguration.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingConfiguration.java
index e31c45b..6a8cf76 100644
--- a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingConfiguration.java
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingConfiguration.java
@@ -17,20 +17,33 @@
 
 package org.apache.shardingsphere.elasticjob.tracing.api;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import org.apache.shardingsphere.elasticjob.api.JobExtraConfiguration;
+import org.apache.shardingsphere.elasticjob.tracing.exception.TracingStorageConverterNotFoundException;
+import org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverterFactory;
 
 /**
  * Tracing configuration.
- * 
+ *
  * @param <T> type of tracing storage
  */
-@RequiredArgsConstructor
 @Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
 public final class TracingConfiguration<T> implements JobExtraConfiguration {
     
-    private final String type;
+    private String type;
     
-    private final T storage;
+    private TracingStorageConfiguration<T> tracingStorageConfiguration;
+    
+    @SuppressWarnings("unchecked")
+    public TracingConfiguration(final String type, final T storage) {
+        this.type = type;
+        this.tracingStorageConfiguration = TracingStorageConverterFactory.findConverter((Class<T>) storage.getClass())
+                .orElseThrow(() -> new TracingStorageConverterNotFoundException(storage.getClass())).convertObjectToConfiguration(storage);
+    }
 }
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingStorageConfiguration.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingStorageConfiguration.java
new file mode 100644
index 0000000..c051dd5
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/api/TracingStorageConfiguration.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.api;
+
+/**
+ * Tracing storage configuration.
+ *
+ * @param <T> storage type
+ */
+public interface TracingStorageConfiguration<T> {
+    
+    /**
+     * Create storage.
+     *
+     * @return storage
+     */
+    T getStorage();
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/exception/TracingStorageConverterNotFoundException.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/exception/TracingStorageConverterNotFoundException.java
new file mode 100644
index 0000000..cecc617
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/exception/TracingStorageConverterNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.exception;
+
+import org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter;
+
+/**
+ * {@link TracingStorageConverter} not found exception.
+ */
+public final class TracingStorageConverterNotFoundException extends RuntimeException {
+    
+    private static final long serialVersionUID = -995858641205565452L;
+    
+    public TracingStorageConverterNotFoundException(final Class<?> storageType) {
+        super(String.format("No TracingConfigurationConverter found for [%s]", storageType.getName()));
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/exception/TracingStorageUnavailableException.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/exception/TracingStorageUnavailableException.java
new file mode 100644
index 0000000..f3fac7f
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/exception/TracingStorageUnavailableException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.exception;
+
+/**
+ * Tracing storage unavailable exception.
+ */
+public final class TracingStorageUnavailableException extends RuntimeException {
+    
+    private static final long serialVersionUID = 7364942870490687255L;
+    
+    public TracingStorageUnavailableException(final Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactory.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactory.java
index b6d3d90..d088728 100644
--- a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactory.java
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactory.java
@@ -50,9 +50,9 @@
      */
     @SuppressWarnings("unchecked")
     public static TracingListener getListener(final TracingConfiguration tracingConfig) throws TracingConfigurationException {
-        if (Strings.isNullOrEmpty(tracingConfig.getType()) || !LISTENER_CONFIGS.containsKey(tracingConfig.getType())) {
+        if (null == tracingConfig.getTracingStorageConfiguration() || Strings.isNullOrEmpty(tracingConfig.getType()) || !LISTENER_CONFIGS.containsKey(tracingConfig.getType())) {
             throw new TracingConfigurationException(String.format("Can not find executor service handler type '%s'.", tracingConfig.getType()));
         }
-        return LISTENER_CONFIGS.get(tracingConfig.getType()).createTracingListener(tracingConfig.getStorage());
+        return LISTENER_CONFIGS.get(tracingConfig.getType()).createTracingListener(tracingConfig.getTracingStorageConfiguration().getStorage());
     }
 }
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverter.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverter.java
new file mode 100644
index 0000000..d7686cb
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.storage;
+
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+
+/**
+ * Tracing storage converter.
+ *
+ * @param <T> storage type
+ */
+public interface TracingStorageConverter<T> {
+    
+    /**
+     * Convert storage to {@link TracingStorageConfiguration}.
+     *
+     * @param storage storage instance
+     * @return instance of {@link TracingStorageConfiguration}
+     */
+    TracingStorageConfiguration<T> convertObjectToConfiguration(T storage);
+    
+    /**
+     * Storage type.
+     *
+     * @return class of storage
+     */
+    Class<T> storageType();
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverterFactory.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverterFactory.java
new file mode 100644
index 0000000..30c1004
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/main/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverterFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.storage;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+/**
+ * Factory for {@link TracingStorageConverter}.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class TracingStorageConverterFactory {
+    
+    private static final List<TracingStorageConverter<?>> CONVERTERS = new LinkedList<>();
+    
+    static {
+        ServiceLoader.load(TracingStorageConverter.class).forEach(CONVERTERS::add);
+    }
+    
+    /**
+     * Find {@link TracingStorageConverter} for specific storage type.
+     *
+     * @param storageType storage type
+     * @param <T>         storage type
+     * @return instance of {@link TracingStorageConverter}
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Optional<TracingStorageConverter<T>> findConverter(final Class<T> storageType) {
+        return CONVERTERS.stream().filter(each -> each.storageType().isAssignableFrom(storageType)).map(each -> (TracingStorageConverter<T>) each).findFirst();
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/JobEventCallerTracingStorageConfiguration.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/JobEventCallerTracingStorageConfiguration.java
new file mode 100644
index 0000000..2042bc0
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/JobEventCallerTracingStorageConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.fixture;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+
+/**
+ * {@link TracingStorageConfiguration} for {@link JobEventCaller}.
+ */
+@RequiredArgsConstructor
+public final class JobEventCallerTracingStorageConfiguration implements TracingStorageConfiguration<JobEventCaller> {
+    
+    private final JobEventCaller jobEventCaller;
+    
+    @Override
+    public JobEventCaller getStorage() {
+        return jobEventCaller;
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/JobEventCallerTracingStorageConverter.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/JobEventCallerTracingStorageConverter.java
new file mode 100644
index 0000000..b094230
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/fixture/JobEventCallerTracingStorageConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.fixture;
+
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+import org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter;
+
+/**
+ * {@link TracingStorageConverter} for {@link JobEventCaller}.
+ */
+public final class JobEventCallerTracingStorageConverter implements TracingStorageConverter<JobEventCaller> {
+    
+    @Override
+    public TracingStorageConfiguration<JobEventCaller> convertObjectToConfiguration(final JobEventCaller storage) {
+        return new JobEventCallerTracingStorageConfiguration(storage);
+    }
+    
+    @Override
+    public Class<JobEventCaller> storageType() {
+        return JobEventCaller.class;
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactoryTest.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactoryTest.java
index facc403..0d5bd3b 100644
--- a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactoryTest.java
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/listener/TracingListenerFactoryTest.java
@@ -19,6 +19,7 @@
 
 import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
 import org.apache.shardingsphere.elasticjob.tracing.exception.TracingConfigurationException;
+import org.apache.shardingsphere.elasticjob.tracing.fixture.JobEventCallerTracingStorageConfiguration;
 import org.apache.shardingsphere.elasticjob.tracing.fixture.TestTracingListener;
 import org.junit.Test;
 
@@ -39,6 +40,7 @@
     
     @Test
     public void assertGetListener() throws TracingConfigurationException {
-        assertThat(TracingListenerFactory.getListener(new TracingConfiguration<>("TEST", null)), instanceOf(TestTracingListener.class));
+        assertThat(TracingListenerFactory.getListener(new TracingConfiguration<>("TEST", new JobEventCallerTracingStorageConfiguration(() -> {
+        }))), instanceOf(TestTracingListener.class));
     }
 }
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverterFactoryTest.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverterFactoryTest.java
new file mode 100644
index 0000000..e4b4838
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/java/org/apache/shardingsphere/elasticjob/tracing/storage/TracingStorageConverterFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.storage;
+
+import org.apache.shardingsphere.elasticjob.tracing.fixture.JobEventCaller;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class TracingStorageConverterFactoryTest {
+    
+    @Test
+    public void assertConverterExists() {
+        assertTrue(TracingStorageConverterFactory.findConverter(JobEventCaller.class).isPresent());
+    }
+    
+    @Test
+    public void assertConverterNotFound() {
+        assertFalse(TracingStorageConverterFactory.findConverter(AClassWithoutCorrespondingConverter.class).isPresent());
+    }
+    
+    private static class AClassWithoutCorrespondingConverter {
+    
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/resources/META-INF/services/org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/resources/META-INF/services/org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter
new file mode 100644
index 0000000..f0c91c1
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-api/src/test/resources/META-INF/services/org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.elasticjob.tracing.fixture.JobEventCallerTracingStorageConverter
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/pom.xml b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/pom.xml
index d6dcd5d..bca9d4d 100644
--- a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/pom.xml
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/pom.xml
@@ -38,6 +38,16 @@
             <artifactId>commons-dbcp</artifactId>
             <version>${commons-dbcp.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+        </dependency>
         
         <dependency>
             <groupId>org.projectlombok</groupId>
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceConfiguration.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceConfiguration.java
new file mode 100644
index 0000000..dd934d1
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceConfiguration.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import com.google.common.base.CaseFormat;
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+
+import javax.sql.DataSource;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+/**
+ * Data source configuration.
+ */
+@Setter
+@Getter
+@NoArgsConstructor
+public final class DataSourceConfiguration implements TracingStorageConfiguration<DataSource> {
+    
+    private static final String GETTER_PREFIX = "get";
+    
+    private static final String SETTER_PREFIX = "set";
+    
+    private static final Collection<Class<?>> GENERAL_CLASS_TYPE;
+    
+    private static final Collection<String> SKIPPED_PROPERTY_NAMES;
+    
+    static {
+        GENERAL_CLASS_TYPE = Sets.newHashSet(boolean.class, Boolean.class, int.class, Integer.class, long.class, Long.class, String.class, Collection.class, List.class);
+        SKIPPED_PROPERTY_NAMES = Sets.newHashSet("loginTimeout");
+    }
+    
+    private String dataSourceClassName;
+    
+    private Map<String, Object> props = new LinkedHashMap<>();
+    
+    public DataSourceConfiguration(final String dataSourceClassName) {
+        this.dataSourceClassName = dataSourceClassName;
+    }
+    
+    /**
+     * Get data source configuration.
+     *
+     * @param dataSource data source
+     * @return data source configuration
+     */
+    public static DataSourceConfiguration getDataSourceConfiguration(final DataSource dataSource) {
+        DataSourceConfiguration result = new DataSourceConfiguration(dataSource.getClass().getName());
+        result.props.putAll(findAllGetterProperties(dataSource));
+        return result;
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private static Map<String, Object> findAllGetterProperties(final Object target) {
+        Collection<Method> allGetterMethods = findAllGetterMethods(target.getClass());
+        Map<String, Object> result = new LinkedHashMap<>(allGetterMethods.size(), 1);
+        for (Method each : allGetterMethods) {
+            String propertyName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, each.getName().substring(GETTER_PREFIX.length()));
+            if (GENERAL_CLASS_TYPE.contains(each.getReturnType()) && !SKIPPED_PROPERTY_NAMES.contains(propertyName)) {
+                Optional.ofNullable(each.invoke(target)).ifPresent(propertyValue -> result.put(propertyName, propertyValue));
+            }
+        }
+        return result;
+    }
+    
+    private static Collection<Method> findAllGetterMethods(final Class<?> clazz) {
+        Method[] methods = clazz.getMethods();
+        Collection<Method> result = new HashSet<>(methods.length);
+        for (Method each : methods) {
+            if (each.getName().startsWith(GETTER_PREFIX) && 0 == each.getParameterTypes().length) {
+                result.add(each);
+            }
+        }
+        return result;
+    }
+    
+    @Override
+    public DataSource getStorage() {
+        return DataSourceRegistry.getInstance().getDataSource(this);
+    }
+    
+    /**
+     * Create data source.
+     *
+     * @return data source
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @SneakyThrows(ReflectiveOperationException.class)
+    public DataSource createDataSource() {
+        DataSource result = (DataSource) Class.forName(dataSourceClassName).getConstructor().newInstance();
+        Method[] methods = result.getClass().getMethods();
+        for (Entry<String, Object> entry : props.entrySet()) {
+            if (SKIPPED_PROPERTY_NAMES.contains(entry.getKey())) {
+                continue;
+            }
+            Optional<Method> setterMethod = findSetterMethod(methods, entry.getKey());
+            if (setterMethod.isPresent()) {
+                setterMethod.get().invoke(result, entry.getValue());
+            }
+        }
+        Optional<JDBCParameterDecorator> decorator = findJDBCParameterDecorator(result);
+        return decorator.isPresent() ? decorator.get().decorate(result) : result;
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private Optional<JDBCParameterDecorator> findJDBCParameterDecorator(final DataSource dataSource) {
+        for (JDBCParameterDecorator each : ServiceLoader.load(JDBCParameterDecorator.class)) {
+            if (each.getType() == dataSource.getClass()) {
+                return Optional.of(each);
+            }
+        }
+        return Optional.empty();
+    }
+    
+    private Optional<Method> findSetterMethod(final Method[] methods, final String property) {
+        String setterMethodName = Joiner.on("").join(SETTER_PREFIX, CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, property));
+        for (Method each : methods) {
+            if (each.getName().equals(setterMethodName) && 1 == each.getParameterTypes().length) {
+                return Optional.of(each);
+            }
+        }
+        return Optional.empty();
+    }
+    
+    @Override
+    public boolean equals(final Object obj) {
+        return this == obj || null != obj && getClass() == obj.getClass() && equalsByProperties((DataSourceConfiguration) obj);
+    }
+    
+    private boolean equalsByProperties(final DataSourceConfiguration dataSourceConfig) {
+        return dataSourceClassName.equals(dataSourceConfig.dataSourceClassName) && props.equals(dataSourceConfig.props);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(dataSourceClassName, props);
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceRegistry.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceRegistry.java
new file mode 100644
index 0000000..00a8266
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceRegistry.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+
+import javax.sql.DataSource;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Mapping {@link TracingStorageConfiguration} to {@link DataSource}.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DataSourceRegistry {
+    
+    private static volatile DataSourceRegistry instance;
+    
+    private final ConcurrentMap<DataSourceConfiguration, DataSource> dataSources = new ConcurrentHashMap<>();
+    
+    /**
+     * Get instance of {@link DataSourceRegistry}.
+     *
+     * @return {@link DataSourceRegistry} singleton
+     */
+    public static DataSourceRegistry getInstance() {
+        if (null == instance) {
+            synchronized (DataSourceRegistry.class) {
+                if (null == instance) {
+                    instance = new DataSourceRegistry();
+                }
+            }
+        }
+        return instance;
+    }
+    
+    /**
+     * Get {@link DataSource} by {@link TracingStorageConfiguration}.
+     *
+     * @param configuration {@link TracingStorageConfiguration}
+     * @return instance of {@link DataSource}
+     */
+    public DataSource getDataSource(final DataSourceConfiguration configuration) {
+        return dataSources.computeIfAbsent(configuration, DataSourceConfiguration::createDataSource);
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceTracingStorageConverter.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceTracingStorageConverter.java
new file mode 100644
index 0000000..0349a4a
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceTracingStorageConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+import org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter;
+import org.apache.shardingsphere.elasticjob.tracing.exception.TracingStorageUnavailableException;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * {@link TracingStorageConverter} for {@link DataSource}.
+ */
+@Slf4j
+public final class DataSourceTracingStorageConverter implements TracingStorageConverter<DataSource> {
+    
+    @Override
+    public TracingStorageConfiguration<DataSource> convertObjectToConfiguration(final DataSource dataSource) {
+        try (Connection connection = dataSource.getConnection()) {
+            log.trace("Try to get connection from {}", connection.getMetaData().getURL());
+        } catch (final SQLException ex) {
+            log.error(ex.getLocalizedMessage(), ex);
+            throw new TracingStorageUnavailableException(ex);
+        }
+        return DataSourceConfiguration.getDataSourceConfiguration(dataSource);
+    }
+    
+    @Override
+    public Class<DataSource> storageType() {
+        return DataSource.class;
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/JDBCParameterDecorator.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/JDBCParameterDecorator.java
new file mode 100644
index 0000000..95fad61
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/JDBCParameterDecorator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import javax.sql.DataSource;
+
+/**
+ * JDBC parameter decorator.
+ * 
+ * @param <T> type of data source
+ */
+public interface JDBCParameterDecorator<T extends DataSource> {
+    
+    /**
+     * Decorate data source.
+     * 
+     * @param dataSource data source to be decorated
+     * @return decorated data source
+     */
+    T decorate(T dataSource);
+    
+    /**
+     * Get data source type.
+     *
+     * @return data source type
+     */
+    Class<T> getType();
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter
new file mode 100644
index 0000000..97410b5
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.elasticjob.tracing.rdb.datasource.DataSourceTracingStorageConverter
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceConfigurationTest.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceConfigurationTest.java
new file mode 100644
index 0000000..8a1439e
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceConfigurationTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class DataSourceConfigurationTest {
+    
+    @Test
+    public void assertGetDataSourceConfiguration() throws SQLException {
+        HikariDataSource actualDataSource = new HikariDataSource();
+        actualDataSource.setDriverClassName("org.h2.Driver");
+        actualDataSource.setJdbcUrl("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+        actualDataSource.setUsername("root");
+        actualDataSource.setPassword("root");
+        actualDataSource.setLoginTimeout(1);
+        DataSourceConfiguration actual = DataSourceConfiguration.getDataSourceConfiguration(actualDataSource);
+        assertThat(actual.getDataSourceClassName(), is(HikariDataSource.class.getName()));
+        assertThat(actual.getProps().get("driverClassName").toString(), is("org.h2.Driver"));
+        assertThat(actual.getProps().get("jdbcUrl").toString(), is("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"));
+        assertThat(actual.getProps().get("username").toString(), is("root"));
+        assertThat(actual.getProps().get("password").toString(), is("root"));
+        assertNull(actual.getProps().get("loginTimeout"));
+    }
+    
+    @Test
+    public void assertCreateDataSource() {
+        Map<String, Object> props = new HashMap<>(16, 1);
+        props.put("driverClassName", "org.h2.Driver");
+        props.put("jdbcUrl", "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+        props.put("username", "root");
+        props.put("password", "root");
+        props.put("loginTimeout", "5000");
+        props.put("test", "test");
+        DataSourceConfiguration dataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        dataSourceConfig.getProps().putAll(props);
+        HikariDataSource actual = (HikariDataSource) dataSourceConfig.createDataSource();
+        assertThat(actual.getDriverClassName(), is("org.h2.Driver"));
+        assertThat(actual.getJdbcUrl(), is("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"));
+        assertThat(actual.getUsername(), is("root"));
+        assertThat(actual.getPassword(), is("root"));
+    }
+    
+    @Test
+    public void assertEquals() {
+        DataSourceConfiguration originalDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        DataSourceConfiguration targetDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        assertThat(originalDataSourceConfig, is(originalDataSourceConfig));
+        assertThat(originalDataSourceConfig, is(targetDataSourceConfig));
+        originalDataSourceConfig.getProps().put("username", "root");
+        targetDataSourceConfig.getProps().put("username", "root");
+        assertThat(originalDataSourceConfig, is(targetDataSourceConfig));
+    }
+    
+    @Test
+    public void assertNotEquals() {
+        DataSourceConfiguration originalDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        DataSourceConfiguration targetDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        originalDataSourceConfig.getProps().put("username", "root");
+        targetDataSourceConfig.getProps().put("username", "root0");
+        assertThat(originalDataSourceConfig, not(targetDataSourceConfig));
+    }
+    
+    @Test
+    public void assertEqualsWithNull() {
+        assertFalse(new DataSourceConfiguration(HikariDataSource.class.getName()).equals(null));
+    }
+    
+    @Test
+    public void assertSameHashCode() {
+        DataSourceConfiguration originalDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        DataSourceConfiguration targetDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        assertThat(originalDataSourceConfig.hashCode(), is(targetDataSourceConfig.hashCode()));
+        originalDataSourceConfig.getProps().put("username", "root");
+        targetDataSourceConfig.getProps().put("username", "root");
+        assertThat(originalDataSourceConfig.hashCode(), is(targetDataSourceConfig.hashCode()));
+        originalDataSourceConfig.getProps().put("password", "root");
+        targetDataSourceConfig.getProps().put("password", "root");
+        assertThat(originalDataSourceConfig.hashCode(), is(targetDataSourceConfig.hashCode()));
+    }
+    
+    @Test
+    public void assertDifferentHashCode() {
+        DataSourceConfiguration originalDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        DataSourceConfiguration targetDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        originalDataSourceConfig.getProps().put("username", "root");
+        targetDataSourceConfig.getProps().put("username", "root");
+        targetDataSourceConfig.getProps().put("password", "root");
+        assertThat(originalDataSourceConfig.hashCode(), not(targetDataSourceConfig.hashCode()));
+        originalDataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        targetDataSourceConfig = new DataSourceConfiguration(BasicDataSource.class.getName());
+        assertThat(originalDataSourceConfig.hashCode(), not(targetDataSourceConfig.hashCode()));
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void assertGetDataSourceConfigurationWithConnectionInitSqls() {
+        BasicDataSource actualDataSource = new BasicDataSource();
+        actualDataSource.setDriverClassName("org.h2.Driver");
+        actualDataSource.setUrl("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+        actualDataSource.setUsername("root");
+        actualDataSource.setPassword("root");
+        actualDataSource.setConnectionInitSqls(Arrays.asList("set names utf8mb4;", "set names utf8;"));
+        DataSourceConfiguration actual = DataSourceConfiguration.getDataSourceConfiguration(actualDataSource);
+        assertThat(actual.getDataSourceClassName(), is(BasicDataSource.class.getName()));
+        assertThat(actual.getProps().get("driverClassName").toString(), is("org.h2.Driver"));
+        assertThat(actual.getProps().get("url").toString(), is("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"));
+        assertThat(actual.getProps().get("username").toString(), is("root"));
+        assertThat(actual.getProps().get("password").toString(), is("root"));
+        assertNull(actual.getProps().get("loginTimeout"));
+        assertThat(actual.getProps().get("connectionInitSqls"), instanceOf(List.class));
+        List<String> actualConnectionInitSql = (List<String>) actual.getProps().get("connectionInitSqls");
+        assertThat(actualConnectionInitSql, hasItem("set names utf8mb4;"));
+        assertThat(actualConnectionInitSql, hasItem("set names utf8;"));
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceRegistryTest.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceRegistryTest.java
new file mode 100644
index 0000000..bf311d6
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceRegistryTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class DataSourceRegistryTest {
+    
+    @Mock
+    private DataSourceConfiguration dataSourceConfiguration;
+    
+    @Test
+    public void assertGetDataSourceBySameConfiguration() {
+        when(dataSourceConfiguration.createDataSource()).then(invocation -> mock(DataSource.class));
+        DataSource expected = DataSourceRegistry.getInstance().getDataSource(dataSourceConfiguration);
+        DataSource actual = DataSourceRegistry.getInstance().getDataSource(dataSourceConfiguration);
+        verify(dataSourceConfiguration).createDataSource();
+        assertThat(actual, is(expected));
+    }
+    
+    @Test
+    public void assertGetDataSourceWithDifferentConfiguration() {
+        when(dataSourceConfiguration.createDataSource()).then(invocation -> mock(DataSource.class));
+        DataSourceConfiguration anotherDataSourceConfiguration = mock(DataSourceConfiguration.class);
+        when(anotherDataSourceConfiguration.createDataSource()).then(invocation -> mock(DataSource.class));
+        DataSource one = DataSourceRegistry.getInstance().getDataSource(dataSourceConfiguration);
+        DataSource another = DataSourceRegistry.getInstance().getDataSource(anotherDataSourceConfiguration);
+        verify(dataSourceConfiguration).createDataSource();
+        verify(anotherDataSourceConfiguration).createDataSource();
+        assertThat(another, not(one));
+    }
+}
diff --git a/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceTracingStorageConverterTest.java b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceTracingStorageConverterTest.java
new file mode 100644
index 0000000..6663969
--- /dev/null
+++ b/elasticjob-ecosystem/elasticjob-tracing/elasticjob-tracing-rdb/src/test/java/org/apache/shardingsphere/elasticjob/tracing/rdb/datasource/DataSourceTracingStorageConverterTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.tracing.rdb.datasource;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.shardingsphere.elasticjob.tracing.api.TracingStorageConfiguration;
+import org.apache.shardingsphere.elasticjob.tracing.exception.TracingStorageUnavailableException;
+import org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverter;
+import org.apache.shardingsphere.elasticjob.tracing.storage.TracingStorageConverterFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class DataSourceTracingStorageConverterTest {
+    
+    @Mock
+    private DataSource dataSource;
+    
+    @Mock
+    private Connection connection;
+    
+    @Mock
+    private DatabaseMetaData databaseMetaData;
+    
+    @Test
+    public void assertConvert() throws SQLException {
+        when(dataSource.getConnection()).thenReturn(connection);
+        when(connection.getMetaData()).thenReturn(databaseMetaData);
+        when(databaseMetaData.getURL()).thenReturn("jdbc:url");
+        DataSourceTracingStorageConverter converter = new DataSourceTracingStorageConverter();
+        TracingStorageConfiguration<DataSource> configuration = converter.convertObjectToConfiguration(dataSource);
+        assertNotNull(configuration);
+    }
+    
+    @Test(expected = TracingStorageUnavailableException.class)
+    public void assertConvertFailed() throws SQLException {
+        DataSourceTracingStorageConverter converter = new DataSourceTracingStorageConverter();
+        doThrow(SQLException.class).when(dataSource).getConnection();
+        converter.convertObjectToConfiguration(dataSource);
+    }
+    
+    @Test
+    public void assertStorageType() {
+        TracingStorageConverter<HikariDataSource> converter = TracingStorageConverterFactory.findConverter(HikariDataSource.class).orElse(null);
+        assertNotNull(converter);
+        assertThat(converter.storageType(), is(DataSource.class));
+    }
+}
diff --git a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
index 8dfd347..6050e4e 100644
--- a/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
+++ b/elasticjob-infra/elasticjob-infra-common/src/main/java/org/apache/shardingsphere/elasticjob/infra/pojo/JobConfigurationPOJO.java
@@ -20,9 +20,11 @@
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.JobExtraConfiguration;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.Properties;
 
 /**
@@ -60,6 +62,8 @@
     
     private Collection<String> jobListenerTypes = new ArrayList<>();
     
+    private Collection<JobExtraConfiguration> jobExtraConfigurations = new LinkedList<>();
+    
     private String description;
     
     private Properties props = new Properties();
@@ -80,6 +84,7 @@
                 .maxTimeDiffSeconds(maxTimeDiffSeconds).reconcileIntervalMinutes(reconcileIntervalMinutes)
                 .jobShardingStrategyType(jobShardingStrategyType).jobExecutorServiceHandlerType(jobExecutorServiceHandlerType).jobErrorHandlerType(jobErrorHandlerType)
                 .jobListenerTypes(jobListenerTypes.toArray(new String[]{})).description(description).disabled(disabled).overwrite(overwrite).build();
+        result.getExtraConfigurations().addAll(jobExtraConfigurations);
         for (Object each : props.keySet()) {
             result.getProps().setProperty(each.toString(), props.get(each.toString()).toString());
         }
@@ -108,6 +113,7 @@
         result.setJobExecutorServiceHandlerType(jobConfiguration.getJobExecutorServiceHandlerType());
         result.setJobErrorHandlerType(jobConfiguration.getJobErrorHandlerType());
         result.setJobListenerTypes(jobConfiguration.getJobListenerTypes());
+        result.setJobExtraConfigurations(jobConfiguration.getExtraConfigurations());
         result.setDescription(jobConfiguration.getDescription());
         result.setProps(jobConfiguration.getProps());
         result.setDisabled(jobConfiguration.isDisabled());
diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
index a2b7236..71032f0 100644
--- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
+++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-boot-starter/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/boot/job/ElasticJobSpringBootTest.java
@@ -80,8 +80,8 @@
         TracingConfiguration<?> tracingConfig = applicationContext.getBean(TracingConfiguration.class);
         assertNotNull(tracingConfig);
         assertThat(tracingConfig.getType(), is("RDB"));
-        assertTrue(tracingConfig.getStorage() instanceof DataSource);
-        DataSource dataSource = (DataSource) tracingConfig.getStorage();
+        assertTrue(tracingConfig.getTracingStorageConfiguration().getStorage() instanceof DataSource);
+        DataSource dataSource = (DataSource) tracingConfig.getTracingStorageConfiguration().getStorage();
         assertNotNull(dataSource.getConnection());
     }
     
diff --git a/pom.xml b/pom.xml
index faa9b54..0624dd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,9 @@
         <fenzo.version>0.11.1</fenzo.version>
         
         <commons-dbcp.version>1.4</commons-dbcp.version>
+        <commons-dbcp2.version>2.2.0</commons-dbcp2.version>
         <commons-pool.version>1.5.4</commons-pool.version>
+        <hikaricp.version>3.4.2</hikaricp.version>
         <mail.version>1.6.0</mail.version>
         
         <mysql-connector-java.version>8.0.16</mysql-connector-java.version>
@@ -285,11 +287,21 @@
                 <version>${commons-dbcp.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-dbcp2</artifactId>
+                <version>${commons-dbcp2.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>commons-pool</groupId>
                 <artifactId>commons-pool</artifactId>
                 <version>${commons-pool.version}</version>
             </dependency>
             <dependency>
+                <groupId>com.zaxxer</groupId>
+                <artifactId>HikariCP</artifactId>
+                <version>${hikaricp.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>com.sun.mail</groupId>
                 <artifactId>javax.mail</artifactId>
                 <version>${mail.version}</version>