[Dubbo-10014]fix serialization of attachments in protobuf. fix issue #10014 and #10307 (#136)

* fix serialization of attachments in protobuf. fix issue #10014 and #10307 in dubbo

* fix code style and nullable argument

* fix npe when no marshaller found

* update protobuf providing forward compatibility in most cases
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java
index 44005f7..e1a851d 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java
@@ -16,10 +16,6 @@
  */
 package org.apache.dubbo.common.serialize.protobuf.support;
 
-import org.apache.dubbo.common.serialize.ObjectInput;
-import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
-import org.apache.dubbo.common.serialize.protobuf.support.wrapper.ThrowablePB;
-
 import com.google.protobuf.BoolValue;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.DoubleValue;
@@ -28,10 +24,13 @@
 import com.google.protobuf.Int64Value;
 import com.google.protobuf.StringValue;
 
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.ThrowablePB;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Type;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
@@ -134,13 +133,8 @@
     }
 
     @Override
-    public Map<String, Object> readAttachments() throws IOException {
-        Map<String, String> stringAttachments = ProtobufUtils.deserialize(is, MapValue.Map.class).getAttachmentsMap();
-        Map<String, Object> attachments = new HashMap<>();
-
-        if (stringAttachments != null) {
-            stringAttachments.forEach((k, v) -> attachments.put(k, v));
-        }
-        return attachments;
+    public Map<String, Object> readAttachments() throws IOException, ClassNotFoundException {
+        MapValue.Map map = ProtobufUtils.deserialize(is, MapValue.Map.class);
+        return ProtobufAttachmentUtils.unwrap(map);
     }
 }
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java
index eea1b73..55bdba9 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java
@@ -16,9 +16,6 @@
  */
 package org.apache.dubbo.common.serialize.protobuf.support;
 
-import org.apache.dubbo.common.serialize.ObjectOutput;
-import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
-
 import com.google.protobuf.BoolValue;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
@@ -29,9 +26,11 @@
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.StringValue;
 
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
+
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
@@ -141,11 +140,8 @@
         if (attachments == null) {
             return;
         }
-
-        Map<String, String> stringAttachments = new HashMap<>();
-        attachments.forEach((k, v) -> stringAttachments.put(k, (String) v));
-
-        ProtobufUtils.serialize(MapValue.Map.newBuilder().putAllAttachments(stringAttachments).build(), os);
+        MapValue.Map map = ProtobufAttachmentUtils.wrap(attachments);
+        ProtobufUtils.serialize(map, os);
         os.flush();
     }
 
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufAttachmentUtils.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufAttachmentUtils.java
new file mode 100644
index 0000000..eea20c3
--- /dev/null
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufAttachmentUtils.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.serialize.protobuf.support;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Empty;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
+
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ProtobufAttachmentUtils
+ */
+public class ProtobufAttachmentUtils {
+    private static Map<String, BuiltinMarshaller> marshallers = new HashMap<>();
+    private final static String NULL_CLASS_NAME = "null";
+
+    static {
+        marshaller(String.class, new StringMarshaller());
+        marshaller(Integer.class, new IntegerMarshaller());
+        marshaller(Long.class, new LongMarshaller());
+        marshaller(Boolean.class, new BooleanMarshaller());
+        marshaller(Float.class, new FloatMarshaller());
+        marshaller(Double.class, new DoubleMarshaller());
+        marshallers.put(NULL_CLASS_NAME, new NullMarshaller());
+    }
+
+    static void marshaller(Class<?> clazz, BuiltinMarshaller marshaller) {
+        marshallers.put(clazz.getCanonicalName(), marshaller);
+    }
+
+    static MapValue.Map wrap(Map<String, Object> attachments) throws IOException {
+        Map<String, Any> genericAttachments = new HashMap<>(attachments.size());
+        Map<String, String> stringAttachments = new HashMap<>(attachments.size());
+        for (Map.Entry<String, Object> entry : attachments.entrySet()) {
+            genericAttachments.put(entry.getKey(), marshal(entry.getValue()));
+            stringAttachments.put(entry.getKey(), String.valueOf(entry.getValue()));
+        }
+        return MapValue.Map.newBuilder().putAllAttachmentsV2(genericAttachments).putAllAttachments(stringAttachments).build();
+    }
+
+    static Map<String, Object> unwrap(MapValue.Map map) throws InvalidProtocolBufferException {
+        Map<String, Object> attachments = new HashMap<>();
+        //compatible with older version.
+        Map<String, String> stringAttachments = map.getAttachmentsMap();
+        if (stringAttachments != null) {
+            stringAttachments.forEach((k, v) -> attachments.put(k, v));
+        }
+
+        Map<String, Any> genericAttachments = map.getAttachmentsV2Map();
+        if (genericAttachments == null) {
+            return attachments;
+        }
+        for (Map.Entry<String, Any> entry : genericAttachments.entrySet()) {
+            attachments.put(entry.getKey(), unmarshal(entry.getValue()));
+        }
+        return attachments;
+    }
+
+    private static Any marshal(Object obj) throws IOException {
+        String className = NULL_CLASS_NAME;
+        if (obj != null) {
+            className = obj.getClass().getCanonicalName();
+        }
+        BuiltinMarshaller marshaller = marshallers.get(className);
+        if (marshaller == null) {
+            throw new IllegalStateException(className + " in attachment is not supported by protobuf.");
+        }
+
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        ProtobufUtils.serialize(StringValue.newBuilder().setValue(className).build(), stream);
+        marshaller.marshal(obj, stream);
+        stream.flush();
+        return Any.newBuilder().setValue(ByteString.copyFrom(stream.toByteArray())).build();
+    }
+
+    private static Object unmarshal(Any any) throws InvalidProtocolBufferException {
+        InputStream stream = new ByteArrayInputStream(any.getValue().toByteArray());
+        String className = ProtobufUtils.deserialize(stream, StringValue.class).getValue();
+        BuiltinMarshaller marshaller = marshallers.get(className);
+        if (marshaller == null) {
+            throw new IllegalStateException(className + " in attachment is not supported by protobuf.");
+        }
+        return marshaller.unmarshal(stream);
+    }
+
+    private static interface BuiltinMarshaller<T> {
+        void marshal(T obj, OutputStream stream) throws IOException;
+
+        T unmarshal(InputStream stream) throws InvalidProtocolBufferException;
+    }
+
+    static class StringMarshaller implements BuiltinMarshaller<String> {
+        @Override
+        public void marshal(String obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(StringValue.newBuilder().setValue(obj).build(), stream);
+        }
+
+        @Override
+        public String unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return ProtobufUtils.deserialize(stream, StringValue.class).getValue();
+        }
+    }
+
+    static class IntegerMarshaller implements BuiltinMarshaller<Integer> {
+        @Override
+        public void marshal(Integer obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(Int32Value.newBuilder().setValue(obj).build(), stream);
+        }
+
+        @Override
+        public Integer unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return ProtobufUtils.deserialize(stream, Int32Value.class).getValue();
+        }
+    }
+
+    static class LongMarshaller implements BuiltinMarshaller<Long> {
+        @Override
+        public void marshal(Long obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(Int64Value.newBuilder().setValue(obj).build(), stream);
+        }
+
+        @Override
+        public Long unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return ProtobufUtils.deserialize(stream, Int64Value.class).getValue();
+        }
+    }
+
+    static class BooleanMarshaller implements BuiltinMarshaller<Boolean> {
+        @Override
+        public void marshal(Boolean obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(BoolValue.newBuilder().setValue(obj).build(), stream);
+        }
+
+        @Override
+        public Boolean unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return ProtobufUtils.deserialize(stream, BoolValue.class).getValue();
+        }
+    }
+
+    static class FloatMarshaller implements BuiltinMarshaller<Float> {
+        @Override
+        public void marshal(Float obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(FloatValue.newBuilder().setValue(obj).build(), stream);
+        }
+
+        @Override
+        public Float unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return ProtobufUtils.deserialize(stream, FloatValue.class).getValue();
+        }
+    }
+
+    static class DoubleMarshaller implements BuiltinMarshaller<Double> {
+        @Override
+        public void marshal(Double obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(DoubleValue.newBuilder().setValue(obj).build(), stream);
+        }
+
+        @Override
+        public Double unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return ProtobufUtils.deserialize(stream, DoubleValue.class).getValue();
+        }
+    }
+
+    static class NullMarshaller implements BuiltinMarshaller<Object> {
+
+        @Override
+        public void marshal(Object obj, OutputStream stream) throws IOException {
+            ProtobufUtils.serialize(Empty.newBuilder().build(), stream);
+        }
+
+        @Override
+        public Object unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+            return null;
+        }
+    }
+}
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto
index 5b0acad..5829f31 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto
@@ -22,6 +22,9 @@
 option java_package = "org.apache.dubbo.common.serialize.protobuf.support.wrapper";
 option java_multiple_files = false;
 
+import "google/protobuf/any.proto";
+
 message Map{
   map<string, string> attachments = 1;
+  map<string, google.protobuf.Any> attachmentsV2 = 2;
 }
diff --git a/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java b/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java
index bfd19d4..88ad01a 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java
+++ b/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java
@@ -347,6 +347,12 @@
     public void testPbMap() throws Exception {
         Map<String, Object> attachments = new HashMap<>();
         attachments.put("key", "value");
+        attachments.put("int", Integer.MAX_VALUE);
+        attachments.put("long", Long.MAX_VALUE);
+        attachments.put("bool", true);
+        attachments.put("float", 0.0001);
+        attachments.put("double", 0.0001d);
+        attachments.put("null", null);
         ObjectOutput objectOutput = serialization.serialize(url, byteArrayOutputStream);
         objectOutput.writeAttachments(attachments);
         objectOutput.flushBuffer();