[Dubbo-10014]fix serialization of attachments in protobuf. fix issue #10014 and #10307 (#136)
* fix serialization of attachments in protobuf. fix issue #10014 and #10307 in dubbo
* fix code style and nullable argument
* fix npe when no marshaller found
* update protobuf providing forward compatibility in most cases
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java
index 44005f7..e1a851d 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectInput.java
@@ -16,10 +16,6 @@
*/
package org.apache.dubbo.common.serialize.protobuf.support;
-import org.apache.dubbo.common.serialize.ObjectInput;
-import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
-import org.apache.dubbo.common.serialize.protobuf.support.wrapper.ThrowablePB;
-
import com.google.protobuf.BoolValue;
import com.google.protobuf.BytesValue;
import com.google.protobuf.DoubleValue;
@@ -28,10 +24,13 @@
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
+import org.apache.dubbo.common.serialize.ObjectInput;
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.ThrowablePB;
+
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
-import java.util.HashMap;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
@@ -134,13 +133,8 @@
}
@Override
- public Map<String, Object> readAttachments() throws IOException {
- Map<String, String> stringAttachments = ProtobufUtils.deserialize(is, MapValue.Map.class).getAttachmentsMap();
- Map<String, Object> attachments = new HashMap<>();
-
- if (stringAttachments != null) {
- stringAttachments.forEach((k, v) -> attachments.put(k, v));
- }
- return attachments;
+ public Map<String, Object> readAttachments() throws IOException, ClassNotFoundException {
+ MapValue.Map map = ProtobufUtils.deserialize(is, MapValue.Map.class);
+ return ProtobufAttachmentUtils.unwrap(map);
}
}
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java
index eea1b73..55bdba9 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/GenericProtobufObjectOutput.java
@@ -16,9 +16,6 @@
*/
package org.apache.dubbo.common.serialize.protobuf.support;
-import org.apache.dubbo.common.serialize.ObjectOutput;
-import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
-
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
@@ -29,9 +26,11 @@
import com.google.protobuf.MessageLite;
import com.google.protobuf.StringValue;
+import org.apache.dubbo.common.serialize.ObjectOutput;
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
+
import java.io.IOException;
import java.io.OutputStream;
-import java.util.HashMap;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
@@ -141,11 +140,8 @@
if (attachments == null) {
return;
}
-
- Map<String, String> stringAttachments = new HashMap<>();
- attachments.forEach((k, v) -> stringAttachments.put(k, (String) v));
-
- ProtobufUtils.serialize(MapValue.Map.newBuilder().putAllAttachments(stringAttachments).build(), os);
+ MapValue.Map map = ProtobufAttachmentUtils.wrap(attachments);
+ ProtobufUtils.serialize(map, os);
os.flush();
}
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufAttachmentUtils.java b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufAttachmentUtils.java
new file mode 100644
index 0000000..eea20c3
--- /dev/null
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/java/org/apache/dubbo/common/serialize/protobuf/support/ProtobufAttachmentUtils.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.serialize.protobuf.support;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Empty;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
+
+import org.apache.dubbo.common.serialize.protobuf.support.wrapper.MapValue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ProtobufAttachmentUtils
+ */
+public class ProtobufAttachmentUtils {
+ private static Map<String, BuiltinMarshaller> marshallers = new HashMap<>();
+ private final static String NULL_CLASS_NAME = "null";
+
+ static {
+ marshaller(String.class, new StringMarshaller());
+ marshaller(Integer.class, new IntegerMarshaller());
+ marshaller(Long.class, new LongMarshaller());
+ marshaller(Boolean.class, new BooleanMarshaller());
+ marshaller(Float.class, new FloatMarshaller());
+ marshaller(Double.class, new DoubleMarshaller());
+ marshallers.put(NULL_CLASS_NAME, new NullMarshaller());
+ }
+
+ static void marshaller(Class<?> clazz, BuiltinMarshaller marshaller) {
+ marshallers.put(clazz.getCanonicalName(), marshaller);
+ }
+
+ static MapValue.Map wrap(Map<String, Object> attachments) throws IOException {
+ Map<String, Any> genericAttachments = new HashMap<>(attachments.size());
+ Map<String, String> stringAttachments = new HashMap<>(attachments.size());
+ for (Map.Entry<String, Object> entry : attachments.entrySet()) {
+ genericAttachments.put(entry.getKey(), marshal(entry.getValue()));
+ stringAttachments.put(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ return MapValue.Map.newBuilder().putAllAttachmentsV2(genericAttachments).putAllAttachments(stringAttachments).build();
+ }
+
+ static Map<String, Object> unwrap(MapValue.Map map) throws InvalidProtocolBufferException {
+ Map<String, Object> attachments = new HashMap<>();
+ //compatible with older version.
+ Map<String, String> stringAttachments = map.getAttachmentsMap();
+ if (stringAttachments != null) {
+ stringAttachments.forEach((k, v) -> attachments.put(k, v));
+ }
+
+ Map<String, Any> genericAttachments = map.getAttachmentsV2Map();
+ if (genericAttachments == null) {
+ return attachments;
+ }
+ for (Map.Entry<String, Any> entry : genericAttachments.entrySet()) {
+ attachments.put(entry.getKey(), unmarshal(entry.getValue()));
+ }
+ return attachments;
+ }
+
+ private static Any marshal(Object obj) throws IOException {
+ String className = NULL_CLASS_NAME;
+ if (obj != null) {
+ className = obj.getClass().getCanonicalName();
+ }
+ BuiltinMarshaller marshaller = marshallers.get(className);
+ if (marshaller == null) {
+ throw new IllegalStateException(className + " in attachment is not supported by protobuf.");
+ }
+
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ ProtobufUtils.serialize(StringValue.newBuilder().setValue(className).build(), stream);
+ marshaller.marshal(obj, stream);
+ stream.flush();
+ return Any.newBuilder().setValue(ByteString.copyFrom(stream.toByteArray())).build();
+ }
+
+ private static Object unmarshal(Any any) throws InvalidProtocolBufferException {
+ InputStream stream = new ByteArrayInputStream(any.getValue().toByteArray());
+ String className = ProtobufUtils.deserialize(stream, StringValue.class).getValue();
+ BuiltinMarshaller marshaller = marshallers.get(className);
+ if (marshaller == null) {
+ throw new IllegalStateException(className + " in attachment is not supported by protobuf.");
+ }
+ return marshaller.unmarshal(stream);
+ }
+
+ private static interface BuiltinMarshaller<T> {
+ void marshal(T obj, OutputStream stream) throws IOException;
+
+ T unmarshal(InputStream stream) throws InvalidProtocolBufferException;
+ }
+
+ static class StringMarshaller implements BuiltinMarshaller<String> {
+ @Override
+ public void marshal(String obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(StringValue.newBuilder().setValue(obj).build(), stream);
+ }
+
+ @Override
+ public String unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return ProtobufUtils.deserialize(stream, StringValue.class).getValue();
+ }
+ }
+
+ static class IntegerMarshaller implements BuiltinMarshaller<Integer> {
+ @Override
+ public void marshal(Integer obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(Int32Value.newBuilder().setValue(obj).build(), stream);
+ }
+
+ @Override
+ public Integer unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return ProtobufUtils.deserialize(stream, Int32Value.class).getValue();
+ }
+ }
+
+ static class LongMarshaller implements BuiltinMarshaller<Long> {
+ @Override
+ public void marshal(Long obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(Int64Value.newBuilder().setValue(obj).build(), stream);
+ }
+
+ @Override
+ public Long unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return ProtobufUtils.deserialize(stream, Int64Value.class).getValue();
+ }
+ }
+
+ static class BooleanMarshaller implements BuiltinMarshaller<Boolean> {
+ @Override
+ public void marshal(Boolean obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(BoolValue.newBuilder().setValue(obj).build(), stream);
+ }
+
+ @Override
+ public Boolean unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return ProtobufUtils.deserialize(stream, BoolValue.class).getValue();
+ }
+ }
+
+ static class FloatMarshaller implements BuiltinMarshaller<Float> {
+ @Override
+ public void marshal(Float obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(FloatValue.newBuilder().setValue(obj).build(), stream);
+ }
+
+ @Override
+ public Float unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return ProtobufUtils.deserialize(stream, FloatValue.class).getValue();
+ }
+ }
+
+ static class DoubleMarshaller implements BuiltinMarshaller<Double> {
+ @Override
+ public void marshal(Double obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(DoubleValue.newBuilder().setValue(obj).build(), stream);
+ }
+
+ @Override
+ public Double unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return ProtobufUtils.deserialize(stream, DoubleValue.class).getValue();
+ }
+ }
+
+ static class NullMarshaller implements BuiltinMarshaller<Object> {
+
+ @Override
+ public void marshal(Object obj, OutputStream stream) throws IOException {
+ ProtobufUtils.serialize(Empty.newBuilder().build(), stream);
+ }
+
+ @Override
+ public Object unmarshal(InputStream stream) throws InvalidProtocolBufferException {
+ return null;
+ }
+ }
+}
diff --git a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto
index 5b0acad..5829f31 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto
+++ b/dubbo-serialization-extensions/dubbo-serialization-protobuf/src/main/proto/MapValue.proto
@@ -22,6 +22,9 @@
option java_package = "org.apache.dubbo.common.serialize.protobuf.support.wrapper";
option java_multiple_files = false;
+import "google/protobuf/any.proto";
+
message Map{
map<string, string> attachments = 1;
+ map<string, google.protobuf.Any> attachmentsV2 = 2;
}
diff --git a/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java b/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java
index bfd19d4..88ad01a 100644
--- a/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java
+++ b/dubbo-serialization-extensions/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/protobuf/support/AbstractProtobufSerializationTest.java
@@ -347,6 +347,12 @@
public void testPbMap() throws Exception {
Map<String, Object> attachments = new HashMap<>();
attachments.put("key", "value");
+ attachments.put("int", Integer.MAX_VALUE);
+ attachments.put("long", Long.MAX_VALUE);
+ attachments.put("bool", true);
+ attachments.put("float", 0.0001);
+ attachments.put("double", 0.0001d);
+ attachments.put("null", null);
ObjectOutput objectOutput = serialization.serialize(url, byteArrayOutputStream);
objectOutput.writeAttachments(attachments);
objectOutput.flushBuffer();