Merge branch '2.7.9-release'

# Conflicts:
#	dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
#	dubbo-dependencies-bom/pom.xml
#	dubbo-dependencies/dubbo-dependencies-zookeeper/pom.xml
#	dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/InterfaceCompatibleRegistryProtocol.java
#	dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
#	dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
#	dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
#	pom.xml
diff --git a/dubbo-cluster/pom.xml b/dubbo-cluster/pom.xml
index bd62974..1240839 100644
--- a/dubbo-cluster/pom.xml
+++ b/dubbo-cluster/pom.xml
@@ -50,5 +50,11 @@
             <artifactId>zookeeper</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-hessian2</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java b/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
index 9d65ffd..6bf4491 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
@@ -18,6 +18,8 @@
 
 import org.apache.dubbo.common.utils.StringUtils;
 
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
+
 /**
  * 2019-10-10
  */
@@ -44,7 +46,7 @@
     public static String versionFromServiceKey(String serviceKey) {
         int index = serviceKey.indexOf(":");
         if (index == -1) {
-            return null;
+            return DEFAULT_VERSION;
         }
         return serviceKey.substring(index + 1);
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 8ff7338..4d81bf9 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -364,4 +364,6 @@
     String SENTINEL_REDIS = "sentinel";
 
     String CLUSTER_REDIS = "cluster";
+
+    String DEFAULT_VERSION = "0.0.0";
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/BaseServiceMetadataTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/BaseServiceMetadataTest.java
index 62d860c..1075aec 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/BaseServiceMetadataTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/BaseServiceMetadataTest.java
@@ -18,6 +18,7 @@
 
 import org.junit.jupiter.api.Test;
 
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
@@ -51,7 +52,7 @@
         assertEquals(BaseServiceMetadata.groupFromServiceKey("group1/org.apache.dubbo.common.TestInterface:1.0.0"), "group1");
         assertEquals(BaseServiceMetadata.interfaceFromServiceKey("group1/org.apache.dubbo.common.TestInterface:1.0.0"), "org.apache.dubbo.common.TestInterface");
 
-        assertNull(BaseServiceMetadata.versionFromServiceKey(""));
+        assertEquals(DEFAULT_VERSION, BaseServiceMetadata.versionFromServiceKey(""));
         assertNull(BaseServiceMetadata.groupFromServiceKey(""));
         assertEquals(BaseServiceMetadata.interfaceFromServiceKey(""), "");
 
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java
index 20e127a..b35ebd1 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationTest.java
@@ -23,8 +23,8 @@
 import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
 
 import java.io.File;
 import java.util.TreeSet;
@@ -45,7 +45,8 @@
  * {@link FileSystemDynamicConfiguration} Test
  */
 // Test often failed on Github Actions Platform because of file system on Azure
-@DisabledIfEnvironmentVariable(named = "DISABLE_FILE_SYSTEM_TEST", matches = "true")
+//@DisabledIfEnvironmentVariable(named = "DISABLE_FILE_SYSTEM_TEST", matches = "true")
+@Disabled
 public class FileSystemDynamicConfigurationTest {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AbstractAnnotationConfigBeanBuilder.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AbstractAnnotationConfigBeanBuilder.java
index 01186af..f12f0d7 100644
--- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AbstractAnnotationConfigBeanBuilder.java
+++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AbstractAnnotationConfigBeanBuilder.java
@@ -16,22 +16,21 @@
  */
 package org.apache.dubbo.config.spring.beans.factory.annotation;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.dubbo.config.AbstractInterfaceConfig;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.config.ModuleConfig;
 import org.apache.dubbo.config.MonitorConfig;
 import org.apache.dubbo.config.RegistryConfig;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.springframework.context.ApplicationContext;
 import org.springframework.util.Assert;
 
 import java.lang.annotation.Annotation;
 import java.util.List;
 
-import static com.alibaba.spring.util.BeanFactoryUtils.getBeans;
-import static com.alibaba.spring.util.BeanFactoryUtils.getOptionalBean;
+import static org.apache.dubbo.config.spring.util.DubboBeanUtils.getBeans;
+import static org.apache.dubbo.config.spring.util.DubboBeanUtils.getOptionalBean;
 
 /**
  * Abstract Configurable {@link Annotation} Bean Builder
diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AnnotatedInterfaceConfigBeanBuilder.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AnnotatedInterfaceConfigBeanBuilder.java
index be951ae..15055f7 100644
--- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AnnotatedInterfaceConfigBeanBuilder.java
+++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/AnnotatedInterfaceConfigBeanBuilder.java
@@ -16,14 +16,13 @@
  */
 package org.apache.dubbo.config.spring.beans.factory.annotation;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.dubbo.config.AbstractInterfaceConfig;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.config.ModuleConfig;
 import org.apache.dubbo.config.MonitorConfig;
 import org.apache.dubbo.config.RegistryConfig;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.springframework.context.ApplicationContext;
 import org.springframework.core.annotation.AnnotationAttributes;
 import org.springframework.util.Assert;
@@ -31,8 +30,8 @@
 import java.lang.annotation.Annotation;
 import java.util.List;
 
-import static com.alibaba.spring.util.BeanFactoryUtils.getBeans;
-import static com.alibaba.spring.util.BeanFactoryUtils.getOptionalBean;
+import static org.apache.dubbo.config.spring.util.DubboBeanUtils.getBeans;
+import static org.apache.dubbo.config.spring.util.DubboBeanUtils.getOptionalBean;
 
 /**
  * An Abstract Builder to build {@link AbstractInterfaceConfig Interface Config} Bean that annotated
diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilder.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilder.java
index f6aacf4..813becc 100644
--- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilder.java
+++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilder.java
@@ -22,7 +22,6 @@
 import org.apache.dubbo.config.annotation.Method;
 import org.apache.dubbo.config.annotation.Reference;
 import org.apache.dubbo.config.spring.ReferenceBean;
-
 import org.springframework.beans.propertyeditors.StringTrimmerEditor;
 import org.springframework.context.ApplicationContext;
 import org.springframework.core.annotation.AnnotationAttributes;
@@ -36,9 +35,9 @@
 
 import static com.alibaba.spring.util.AnnotationUtils.getAttribute;
 import static com.alibaba.spring.util.AnnotationUtils.getAttributes;
-import static com.alibaba.spring.util.BeanFactoryUtils.getOptionalBean;
 import static com.alibaba.spring.util.ObjectUtils.of;
 import static org.apache.dubbo.config.spring.util.DubboAnnotationUtils.resolveServiceInterfaceClass;
+import static org.apache.dubbo.config.spring.util.DubboBeanUtils.getOptionalBean;
 import static org.springframework.core.annotation.AnnotationAttributes.fromMap;
 import static org.springframework.util.StringUtils.commaDelimitedListToStringArray;
 
diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/extension/SpringExtensionFactory.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/extension/SpringExtensionFactory.java
index c7b6e77..6e08c62 100644
--- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/extension/SpringExtensionFactory.java
+++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/extension/SpringExtensionFactory.java
@@ -21,13 +21,13 @@
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.ConcurrentHashSet;
-
-import com.alibaba.spring.util.BeanFactoryUtils;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ConfigurableApplicationContext;
 
 import java.util.Set;
 
+import static org.apache.dubbo.config.spring.util.DubboBeanUtils.getOptionalBean;
+
 /**
  * SpringExtensionFactory
  */
@@ -66,7 +66,7 @@
         }
 
         for (ApplicationContext context : CONTEXTS) {
-            T bean = BeanFactoryUtils.getOptionalBean(context, name, type);
+            T bean = getOptionalBean(context, name, type);
             if (bean != null) {
                 return bean;
             }
@@ -76,4 +76,5 @@
 
         return null;
     }
+
 }
diff --git a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/DubboBeanUtils.java b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/DubboBeanUtils.java
index 4a07e0d..e6d7d9d 100644
--- a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/DubboBeanUtils.java
+++ b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/util/DubboBeanUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.dubbo.config.spring.util;
 
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.config.spring.beans.factory.annotation.DubboConfigAliasPostProcessor;
 import org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor;
 import org.apache.dubbo.config.spring.beans.factory.config.DubboConfigDefaultPropertyValueBeanPostProcessor;
@@ -23,17 +25,31 @@
 import org.apache.dubbo.config.spring.context.DubboApplicationListenerRegistrar;
 import org.apache.dubbo.config.spring.context.DubboBootstrapApplicationListener;
 import org.apache.dubbo.config.spring.context.DubboLifecycleComponentApplicationListener;
-
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactoryUtils;
+import org.springframework.beans.factory.BeanNotOfRequiredTypeException;
+import org.springframework.beans.factory.ListableBeanFactory;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
 import org.springframework.beans.factory.support.BeanDefinitionRegistry;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import static com.alibaba.spring.util.BeanRegistrar.registerInfrastructureBean;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.unmodifiableList;
+import static org.springframework.util.ObjectUtils.isEmpty;
 
 /**
  * Dubbo Bean utilities class
  *
  * @since 2.7.6
  */
-public interface DubboBeanUtils {
+public abstract class DubboBeanUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(DubboBeanUtils.class);
 
     /**
      * Register the common beans
@@ -45,7 +61,7 @@
      * @see DubboLifecycleComponentApplicationListener
      * @see DubboBootstrapApplicationListener
      */
-    static void registerCommonBeans(BeanDefinitionRegistry registry) {
+    public static void registerCommonBeans(BeanDefinitionRegistry registry) {
 
         // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
         registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
@@ -77,4 +93,69 @@
         registerInfrastructureBean(registry, DubboConfigEarlyInitializationPostProcessor.BEAN_NAME,
                 DubboConfigEarlyInitializationPostProcessor.class);
     }
+
+    /**
+     * Get optional bean by name and type if beanName is not null, or else find by type
+     *
+     * @param beanFactory
+     * @param beanName
+     * @param beanType
+     * @param <T>
+     * @return
+     */
+    public static <T> T getOptionalBean(ListableBeanFactory beanFactory, String beanName, Class<T> beanType) throws BeansException {
+        if (beanName == null) {
+            return getOptionalBeanByType(beanFactory, beanType);
+        }
+
+        T bean = null;
+        try {
+            bean = beanFactory.getBean(beanName, beanType);
+        } catch (NoSuchBeanDefinitionException e) {
+            // ignore NoSuchBeanDefinitionException
+        } catch (BeanNotOfRequiredTypeException e) {
+            // ignore BeanNotOfRequiredTypeException
+            logger.warn(String.format("bean type not match, name: %s, expected type: %s, actual type: %s",
+                    beanName, beanType.getName(), e.getActualType().getName()));
+        }
+        return bean;
+    }
+
+    private static <T> T getOptionalBeanByType(ListableBeanFactory beanFactory, Class<T> beanType) {
+        // Issue : https://github.com/alibaba/spring-context-support/issues/20
+        String[] beanNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(beanFactory, beanType, true, false);
+        if (beanNames == null || beanNames.length == 0) {
+            return null;
+        } else if (beanNames.length > 1){
+            throw new NoUniqueBeanDefinitionException(beanType, Arrays.asList(beanNames));
+        }
+        return (T) beanFactory.getBean(beanNames[0]);
+    }
+
+    public static <T> T getBean(ListableBeanFactory beanFactory, String beanName, Class<T> beanType) throws BeansException {
+        return beanFactory.getBean(beanName, beanType);
+    }
+
+    /**
+     * Get beans by names and type
+     *
+     * @param beanFactory
+     * @param beanNames
+     * @param beanType
+     * @param <T>
+     * @return
+     */
+    public static <T> List<T> getBeans(ListableBeanFactory beanFactory, String[] beanNames, Class<T> beanType) throws BeansException {
+        if (isEmpty(beanNames)) {
+            return emptyList();
+        }
+        List<T> beans = new ArrayList<T>(beanNames.length);
+        for (String beanName : beanNames) {
+            T bean = getBean(beanFactory, beanName, beanType);
+            if (bean != null) {
+                beans.add(bean);
+            }
+        }
+        return unmodifiableList(beans);
+    }
 }
diff --git a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilderTest.java b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilderTest.java
index da56ef9..5ad4afd 100644
--- a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilderTest.java
+++ b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/beans/factory/annotation/ReferenceBeanBuilderTest.java
@@ -70,7 +70,7 @@
             timeout = 3, cache = "cache", filter = {"echo", "generic", "accesslog"},
             listener = {"deprecated"}, parameters = {"n1=v1  ", "n2 = v2 ", "  n3 =   v3  "},
             application = "application",
-            module = "module", consumer = "consumer", monitor = "monitor", registry = {"registry"},
+            module = "module", consumer = "consumer", monitor = "monitor", registry = {},
             // @since 2.7.3
             id = "reference",
             // @since 2.7.8
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
index f8bb0e1..1e5a7d5 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml
@@ -87,7 +87,10 @@
             <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-serialization-hessian2</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+        </dependency>
         <!-- The metadata center cannot be initialized without this dependency -->
         <dependency>
             <groupId>org.codehaus.jackson</groupId>
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
index f4b97c9..438c542 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/pom.xml
@@ -92,6 +92,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-serialization-jdk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-qos</artifactId>
         </dependency>
         <dependency>
diff --git a/dubbo-metadata/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java b/dubbo-metadata/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java
index ceaa54a..119522c 100644
--- a/dubbo-metadata/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java
+++ b/dubbo-metadata/dubbo-metadata-report-etcd/src/test/java/org/apache/dubbo/metadata/store/etcd/EtcdMetadataReportTest.java
@@ -34,6 +34,7 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.net.URI;
@@ -51,6 +52,7 @@
 /**
  * Unit test for etcd metadata report
  */
+@Disabled
 public class EtcdMetadataReportTest {
 
     private static final String TEST_SERVICE = "org.apache.dubbo.metadata.store.etcd.EtcdMetadata4TstService";
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 42b0279..5031470 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.remoting.exchange.codec;

 

 import org.apache.dubbo.common.Version;

+import org.apache.dubbo.common.config.ConfigurationUtils;

 import org.apache.dubbo.common.io.Bytes;

 import org.apache.dubbo.common.io.StreamUtils;

 import org.apache.dubbo.common.logger.Logger;

@@ -157,7 +158,7 @@
                             // heart beat response data is always null;

                             data = null;

                         } else {

-                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));

+                            data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);

                         }

                     } else {

                         data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));

@@ -187,7 +188,7 @@
                         // heart beat response data is always null;

                         data = null;

                     } else {

-                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));

+                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);

                     }

                 } else {

                     data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));

@@ -215,7 +216,7 @@
     }

 

     protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {

-        Serialization serialization = getSerialization(channel);

+        Serialization serialization = getSerialization(channel, req);

         // header.

         byte[] header = new byte[HEADER_LENGTH];

         // set magic number.

@@ -270,7 +271,7 @@
     protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {

         int savedWriteIndex = buffer.writerIndex();

         try {

-            Serialization serialization = getSerialization(channel);

+            Serialization serialization = getSerialization(channel, res);

             // header.

             byte[] header = new byte[HEADER_LENGTH];

             // set magic number.

@@ -373,11 +374,6 @@
         return decodeRequestData(in);

     }

 

-    @Deprecated

-    protected Object decodeHeartbeatData(ObjectInput in) throws IOException {

-        return decodeEventData(null, in);

-    }

-

     protected Object decodeRequestData(ObjectInput in) throws IOException {

         try {

             return in.readObject();

@@ -421,19 +417,21 @@
         return decodeRequestData(channel, in);

     }

 

-    protected Object decodeEventData(Channel channel, ObjectInput in) throws IOException {

+    protected Object decodeEventData(Channel channel, ObjectInput in, byte[] eventBytes) throws IOException {

         try {

+            if (eventBytes != null) {

+                int dataLen = eventBytes.length;

+                int threshold = ConfigurationUtils.getSystemConfiguration().getInt("deserialization.event.size", 50);

+                if (dataLen > threshold) {

+                    throw new IllegalArgumentException("Event data too long, actual size " + dataLen + ", threshold " + threshold + " rejected for security consideration.");

+                }

+            }

             return in.readEvent();

         } catch (IOException | ClassNotFoundException e) {

             throw new IOException(StringUtils.toString("Decode dubbo protocol event failed.", e));

         }

     }

 

-    @Deprecated

-    protected Object decodeHeartbeatData(Channel channel, ObjectInput in) throws IOException {

-        return decodeEventData(channel, in);

-    }

-

     protected Object decodeRequestData(Channel channel, ObjectInput in) throws IOException {

         return decodeRequestData(in);

     }

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
index 9be9663..e84c28e 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractCodec.java
@@ -16,9 +16,6 @@
  */
 package org.apache.dubbo.remoting.transport;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
@@ -27,6 +24,11 @@
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.Codec2;
 import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
 
@@ -48,18 +50,26 @@
         }
         if (payload > 0 && size > payload) {
             ExceedPayloadLimitException e = new ExceedPayloadLimitException(
-                "Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
+                    "Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
             logger.error(e);
             throw e;
         }
     }
 
+    protected Serialization getSerialization(Channel channel, Request req) {
+        return CodecSupport.getSerialization(channel.getUrl());
+    }
+
+    protected Serialization getSerialization(Channel channel, Response res) {
+        return CodecSupport.getSerialization(channel.getUrl());
+    }
+
     protected Serialization getSerialization(Channel channel) {
         return CodecSupport.getSerialization(channel.getUrl());
     }
 
     protected boolean isClientSide(Channel channel) {
-        String side = (String)channel.getAttribute(SIDE_KEY);
+        String side = (String) channel.getAttribute(SIDE_KEY);
         if (CLIENT_SIDE.equals(side)) {
             return true;
         } else if (SERVER_SIDE.equals(side)) {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
index f45e7a3..d4beb50 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java
@@ -24,29 +24,29 @@
 import org.apache.dubbo.common.serialize.ObjectInput;
 import org.apache.dubbo.common.serialize.ObjectOutput;
 import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.model.ServiceRepository;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.dubbo.common.serialize.Constants.COMPACTED_JAVA_SERIALIZATION_ID;
-import static org.apache.dubbo.common.serialize.Constants.JAVA_SERIALIZATION_ID;
-import static org.apache.dubbo.common.serialize.Constants.NATIVE_JAVA_SERIALIZATION_ID;
-
 public class CodecSupport {
-
     private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class);
     private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
     private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
     private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
     // Cache null object serialize results, for heartbeat request/response serialize use.
     private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
-    // NIO ThreadLocal buffer to read event payload
+
     private static final ThreadLocal<byte[]> TL_BUFFER = ThreadLocal.withInitial(() -> new byte[1024]);
 
     static {
@@ -74,7 +74,7 @@
         return ID_SERIALIZATION_MAP.get(id);
     }
 
-    public static byte getIDByName(String name) {
+    public static Byte getIDByName(String name) {
         return SERIALIZATIONNAME_ID_MAP.get(name);
     }
 
@@ -84,15 +84,11 @@
     }
 
     public static Serialization getSerialization(URL url, Byte id) throws IOException {
-        Serialization serialization = getSerializationById(id);
-        String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
-        // Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
-        if (serialization == null
-                || ((id == JAVA_SERIALIZATION_ID || id == NATIVE_JAVA_SERIALIZATION_ID || id == COMPACTED_JAVA_SERIALIZATION_ID)
-                && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
-            throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
+        Serialization result = getSerializationById(id);
+        if (result == null) {
+            throw new IOException("Unrecognized serialize type from consumer: " + id);
         }
-        return serialization;
+        return result;
     }
 
     public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
@@ -161,4 +157,25 @@
     public static boolean isHeartBeat(byte[] payload, byte proto) {
         return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
     }
+
+    public static void checkSerialization(String path, String version, Byte id) throws IOException {
+        ServiceRepository repository = ApplicationModel.getServiceRepository();
+        ProviderModel providerModel = repository.lookupExportedServiceWithoutGroup(path + ":" + version);
+        if (providerModel == null) {
+            if (logger.isWarnEnabled()) {
+                logger.warn("Serialization security check is enabled but cannot work as expected because " +
+                        "there's no matched provider model for path " + path + ", version " + version);
+            }
+        } else {
+            List<URL> urls = providerModel.getServiceConfig().getExportedUrls();
+            if (CollectionUtils.isNotEmpty(urls)) {
+                URL url = urls.get(0);
+                String serializationName = url.getParameter(org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
+                Byte localId = SERIALIZATIONNAME_ID_MAP.get(serializationName);
+                if (localId != null && !localId.equals(id)) {
+                    throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
+                }
+            }
+        }
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
index cb99fe3..3cb2c66 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -96,6 +96,22 @@
         return request;
     }
 
+    private byte[] getReadonlyEventRequestBytes(Object obj, byte[] header) throws IOException {
+        // encode request data.
+        UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream(1024);
+        ObjectOutput out = serialization.serialize(url, bos);
+        out.writeObject(obj);
+
+        out.flushBuffer();
+        bos.flush();
+        bos.close();
+        byte[] data = bos.toByteArray();
+//        byte[] len = Bytes.int2bytes(data.length);
+        System.arraycopy(data, 0, header, 12, data.length);
+        byte[] request = join(header, data);
+        return request;
+    }
+
     private byte[] assemblyDataProtocol(byte[] header) {
         Person request = new Person();
         byte[] newbuf = join(header, objectToByte(request));
@@ -232,12 +248,14 @@
         Person person = new Person();
         byte[] request = getRequestBytes(person, header);
 
+        System.setProperty("deserialization.event.size", "100");
         Request obj = (Request) decode(request);
         Assertions.assertEquals(person, obj.getData());
         Assertions.assertTrue(obj.isTwoWay());
         Assertions.assertTrue(obj.isEvent());
         Assertions.assertEquals(Version.getProtocolVersion(), obj.getVersion());
         System.out.println(obj);
+        System.clearProperty("deserialization.event.size");
     }
 
     @Test
@@ -271,7 +289,7 @@
     @Test
     public void test_Decode_Return_Request_Object() throws IOException {
         //|10011111|20-stats=ok|id=0|length=0
-        byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0xe2, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+        byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0xc2, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
         Person person = new Person();
         byte[] request = getRequestBytes(person, header);
 
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
index 898e1a9..6188984 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/LeaseTest.java
@@ -49,6 +49,7 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.concurrent.CountDownLatch;
@@ -61,6 +62,7 @@
 /**
  * @author cvictory ON 2019-08-16
  */
+@Disabled
 public class LeaseTest {
 
     private static EtcdCluster cluster;
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index 4cff355..118e701 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -27,6 +27,8 @@
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import static org.apache.dubbo.rpc.Constants.INVOCATION_KEY;
+
 /**
  * {@link AsyncRpcResult} is introduced in 3.0.0 to replace RpcResult, and RpcResult is replaced with {@link AppResponse}:
  * <ul>
@@ -56,9 +58,15 @@
 
     private Map<String, Object> attachments = new HashMap<>();
 
+    private Map<String, Object> attributes = new HashMap<>();
+
     public AppResponse() {
     }
 
+    public AppResponse(Invocation invocation) {
+        this.setAttribute(INVOCATION_KEY, invocation);
+    }
+
     public AppResponse(Object result) {
         this.result = result;
     }
@@ -205,6 +213,14 @@
         attachments.put(key, value);
     }
 
+    public Object getAttribute(String key) {
+        return attributes.get(key);
+    }
+
+    public void setAttribute(String key, Object value) {
+        attributes.put(key, value);
+    }
+
     @Override
     public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
         throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index d61d48f..0db0f19 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -94,7 +94,7 @@
             if (responseFuture.isDone()) {
                 responseFuture.get().setValue(value);
             } else {
-                AppResponse appResponse = new AppResponse();
+                AppResponse appResponse = new AppResponse(invocation);
                 appResponse.setValue(value);
                 responseFuture.complete(appResponse);
             }
@@ -116,7 +116,7 @@
             if (responseFuture.isDone()) {
                 responseFuture.get().setException(t);
             } else {
-                AppResponse appResponse = new AppResponse();
+                AppResponse appResponse = new AppResponse(invocation);
                 appResponse.setException(t);
                 responseFuture.complete(appResponse);
             }
@@ -319,7 +319,7 @@
 
     public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
         CompletableFuture<AppResponse> future = new CompletableFuture<>();
-        AppResponse result = new AppResponse();
+        AppResponse result = new AppResponse(invocation);
         if (t != null) {
             result.setException(t);
         } else {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index f5fd982..a544fa4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -90,4 +90,8 @@
 
     String CONSUMER_MODEL = "consumerModel";
     String METHOD_MODEL = "methodModel";
+
+    String SERIALIZATION_SECURITY_CHECK_KEY = "serialization.security.check";
+    String INVOCATION_KEY = "invocation";
+    String SERIALIZATION_ID_KEY = "serialization_id";
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 0865f08..2c749b4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -26,6 +26,7 @@
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.transport.CodecSupport;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.InvokeMode;
@@ -44,6 +45,10 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.dubbo.remoting.Constants.DEFAULT_REMOTING_SERIALIZATION;
+import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+
 /**
  * This Invoker works on Consumer side.
  */
@@ -158,6 +163,11 @@
         invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
 
+        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
+        if (serializationId != null) {
+            invocation.put(SERIALIZATION_ID_KEY, serializationId);
+        }
+
         AsyncRpcResult asyncResult;
         try {
             asyncResult = (AsyncRpcResult) doInvoke(invocation);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 1b1d592..79168c9 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -82,9 +82,9 @@
     public Result invoke(Invocation invocation) throws RpcException {
         try {
             Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
-			CompletableFuture<Object> future = wrapWithFuture(value);
+            CompletableFuture<Object> future = wrapWithFuture(value);
             CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
-                AppResponse result = new AppResponse();
+                AppResponse result = new AppResponse(invocation);
                 if (t != null) {
                     if (t instanceof CompletionException) {
                         result.setException(t.getCause());
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index f286ddb..ebfc41c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.protocol.dubbo;
 
 
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.Cleanable;
@@ -47,6 +48,8 @@
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
 import static org.apache.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeInvocationArgument;
 
 public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
@@ -95,10 +98,15 @@
         throw new UnsupportedOperationException();
     }
 
+    private void checkSerializationTypeFromRemote() {
+
+    }
+
     @Override
     public Object decode(Channel channel, InputStream input) throws IOException {
         ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                 .deserialize(channel.getUrl(), input);
+        this.put(SERIALIZATION_ID_KEY, serializationType);
 
         String dubboVersion = in.readUTF();
         request.setVersion(dubboVersion);
@@ -106,7 +114,8 @@
 
         String path = in.readUTF();
         setAttachment(PATH_KEY, path);
-        setAttachment(VERSION_KEY, in.readUTF());
+        String version = in.readUTF();
+        setAttachment(VERSION_KEY, version);
 
         setMethodName(in.readUTF());
 
@@ -114,6 +123,9 @@
         setParameterTypesDesc(desc);
 
         try {
+            if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
+                CodecSupport.checkSerialization(path, version, serializationType);
+            }
             Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
             Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
             if (desc.length() > 0) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index 83db986..3ac302d 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.protocol.dubbo;
 
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.Cleanable;
@@ -38,6 +39,9 @@
 import java.io.OutputStream;
 import java.lang.reflect.Type;
 
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;
+
 public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable {
 
     private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);
@@ -114,6 +118,14 @@
     public void decode() throws Exception {
         if (!hasDecoded && channel != null && inputStream != null) {
             try {
+                if (ConfigurationUtils.getSystemConfiguration().getBoolean(SERIALIZATION_SECURITY_CHECK_KEY, false)) {
+                    Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+                    if (serializationType_obj != null) {
+                        if ((byte) serializationType_obj != serializationType) {
+                            throw new IOException("Unexpected serialization id:" + serializationType + " received from network, please check if the peer send the right id.");
+                        }
+                    }
+                }
                 decode(channel, inputStream);
             } catch (Throwable e) {
                 if (log.isWarnEnabled()) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index ef51a3f..c6c7994 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -23,12 +23,14 @@
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.ObjectInput;
 import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
 import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcInvocation;
@@ -86,7 +88,7 @@
                             data = null;
                         } else {
                             ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
-                            data = decodeEventData(channel, in);
+                            data = decodeEventData(channel, in, eventPayload);
                         }
                     } else {
                         DecodeableRpcResult result;
@@ -131,7 +133,7 @@
                         data = null;
                     } else {
                         ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
-                        data = decodeEventData(channel, in);
+                        data = decodeEventData(channel, in, eventPayload);
                     }
                 } else {
                     DecodeableRpcInvocation inv;
@@ -226,4 +228,21 @@
             out.writeAttachments(result.getObjectAttachments());
         }
     }
+
+    @Override
+    protected Serialization getSerialization(Channel channel, Request req) {
+        if (!(req.getData() instanceof Invocation)) {
+            return super.getSerialization(channel, req);
+        }
+        return DubboCodecSupport.getRequestSerialization(channel.getUrl(), (Invocation) req.getData());
+    }
+
+    @Override
+    protected Serialization getSerialization(Channel channel, Response res) {
+        if (!(res.getResult() instanceof AppResponse)) {
+            return super.getSerialization(channel, res);
+        }
+        return DubboCodecSupport.getResponseSerialization(channel.getUrl(), (AppResponse) res.getResult());
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
new file mode 100644
index 0000000..2dbb312
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodecSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.serialize.Serialization;
+import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.Invocation;
+
+import static org.apache.dubbo.rpc.Constants.INVOCATION_KEY;
+import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
+
+public class DubboCodecSupport {
+
+    public static Serialization getRequestSerialization(URL url, Invocation invocation) {
+        Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+        if (serializationType_obj != null) {
+            return CodecSupport.getSerializationById((byte) serializationType_obj);
+        }
+        return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
+                url.getParameter(org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
+    }
+
+    public static Serialization getResponseSerialization(URL url, AppResponse appResponse) {
+        Object invocation_obj = appResponse.getAttribute(INVOCATION_KEY);
+        if (invocation_obj != null) {
+            Invocation invocation = (Invocation) invocation_obj;
+            Object serializationType_obj = invocation.get(SERIALIZATION_ID_KEY);
+            if (serializationType_obj != null) {
+                return CodecSupport.getSerializationById((byte) serializationType_obj);
+            }
+        }
+        return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
+                url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION));
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 4fc52f7..c4c9474 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
 import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
@@ -76,7 +77,7 @@
         super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
         this.clients = clients;
         // get version.
-        this.version = url.getParameter(VERSION_KEY, "0.0.0");
+        this.version = url.getParameter(VERSION_KEY, DEFAULT_VERSION);
         this.invokers = invokers;
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java
index 6600901..b1eeff9 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java
@@ -216,6 +216,7 @@
             service.returnNonSerialized();
             Assertions.fail();
         } catch (RpcException e) {
+            e.printStackTrace();
             Assertions.assertTrue(e.getMessage().contains("org.apache.dubbo.rpc.protocol.dubbo.support.NonSerialized must implement java.io.Serializable"));
         }
     }