| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package cloudevents |
| |
| import ( |
| "context" |
| "fmt" |
| "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/common/protocol/grpc" |
| "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/common/protocol/tcp" |
| "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin" |
| "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol" |
| "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts" |
| "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb" |
| "github.com/cloudevents/sdk-go/v2" |
| "github.com/cloudevents/sdk-go/v2/event" |
| "github.com/cloudevents/sdk-go/v2/event/datacodec" |
| ) |
| |
| func init() { |
| plugin.Register(protocol.CloudEvents, &CloudeventsPlugin{}) |
| } |
| |
| // CloudeventsPlugin CloudEvents protocol adaptor |
| // used to transform CloudEvents message to CloudEvents message. |
| type CloudeventsPlugin struct { |
| } |
| |
| func (c *CloudeventsPlugin) Type() string { |
| return plugin.Protocol |
| } |
| |
| func (c *CloudeventsPlugin) Setup(name string, dec plugin.Decoder) error { |
| return nil |
| } |
| |
| func (c *CloudeventsPlugin) ToCloudEvent(msg interface{}) (*v2.Event, error) { |
| switch msg := msg.(type) { |
| case *tcp.Package: |
| pck := msg |
| return c.deserializeTcpProtocol(pck) |
| case *grpc.SimpleMessageWrapper: |
| sw := msg |
| sm := sw.SimpleMessage |
| return deserializeGrpcProtocol(sm) |
| } |
| panic("implement me") |
| } |
| |
| func (c *CloudeventsPlugin) ToCloudEvents(msg interface{}) ([]*v2.Event, error) { |
| bmw := msg.(*grpc.BatchMessageWrapper) |
| return buildBatchMessage(bmw.BatchMessage) |
| } |
| |
| func (c *CloudeventsPlugin) FromCloudEvent(event *v2.Event) (interface{}, error) { |
| desc := event.Extensions()[grpc.PROTOCOL_DESC].(string) |
| if desc == "grpc" { |
| return buildSimpleMessage(event) |
| } |
| return nil, fmt.Errorf("only grpc supported now") |
| } |
| |
| func (c *CloudeventsPlugin) ProtocolType() string { |
| //TODO implement me |
| panic("implement me") |
| } |
| |
| func (c *CloudeventsPlugin) deserializeTcpProtocol(pck *tcp.Package) (*v2.Event, error) { |
| // TODO add when support tcp procotol |
| panic("implement me") |
| } |
| |
| func defaultIfEmpty(in string, def interface{}) string { |
| if in == "" { |
| return def.(string) |
| } |
| return in |
| } |
| |
| func defaultIfNil(def string, in interface{}) string { |
| if in == nil { |
| return def |
| } |
| return in.(string) |
| } |
| |
| func deserializeGrpcProtocol(sm *pb.SimpleMessage) (*v2.Event, error) { |
| content := sm.Content |
| ct, ok := sm.Properties[grpc.CONTENT_TYPE] |
| if !ok { |
| ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON |
| } |
| evt := v2.NewEvent() |
| if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil { |
| return nil, err |
| } |
| hdr := sm.Header |
| result := v2.NewEvent() |
| ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION]) |
| topic := defaultIfEmpty(sm.Topic, evt.Subject()) |
| result.SetExtension(grpc.ENV, defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV])) |
| result.SetExtension(grpc.IDC, defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC])) |
| result.SetExtension(grpc.IP, defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP])) |
| result.SetExtension(grpc.PID, defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID])) |
| result.SetExtension(grpc.SYS, defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS])) |
| result.SetExtension(grpc.LANGUAGE, defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE])) |
| result.SetExtension(grpc.PROTOCOL_TYPE, defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE])) |
| result.SetExtension(grpc.PROTOCOL_DESC, defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC])) |
| result.SetExtension(grpc.PROTOCOL_VERSION, defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])) |
| result.SetExtension(grpc.UNIQUE_ID, defaultIfEmpty(sm.UniqueId, evt.Extensions()[grpc.UNIQUE_ID])) |
| result.SetExtension(grpc.SEQ_NUM, defaultIfEmpty(sm.SeqNum, evt.Extensions()[grpc.SEQ_NUM])) |
| result.SetExtension(grpc.USERNAME, defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME])) |
| result.SetExtension(grpc.PASSWD, defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD])) |
| result.SetExtension(grpc.TTL, defaultIfEmpty(sm.Ttl, evt.Extensions()[grpc.TTL])) |
| result.SetExtension(grpc.PRODUCERGROUP, defaultIfEmpty(sm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP])) |
| |
| if ver == event.CloudEventsVersionV1 { |
| result.SetSpecVersion(event.CloudEventsVersionV1) |
| } else { |
| result.SetSpecVersion(event.CloudEventsVersionV03) |
| } |
| result.SetSubject(topic) |
| |
| return &result, nil |
| } |
| |
| func buildBatchMessage(bm *pb.BatchMessage) ([]*v2.Event, error) { |
| var msgs []*v2.Event |
| hdr := bm.Header |
| |
| for _, item := range bm.MessageItem { |
| content := item.Content |
| ct, ok := item.Properties[grpc.CONTENT_TYPE] |
| if !ok { |
| ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON |
| } |
| evt := v2.NewEvent() |
| if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil { |
| return nil, err |
| } |
| result := v2.NewEvent() |
| ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION]) |
| topic := defaultIfEmpty(bm.Topic, evt.Subject()) |
| result.SetExtension(grpc.ENV, defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV])) |
| result.SetExtension(grpc.IDC, defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC])) |
| result.SetExtension(grpc.IP, defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP])) |
| result.SetExtension(grpc.PID, defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID])) |
| result.SetExtension(grpc.SYS, defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS])) |
| result.SetExtension(grpc.LANGUAGE, defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE])) |
| result.SetExtension(grpc.PROTOCOL_TYPE, defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE])) |
| result.SetExtension(grpc.PROTOCOL_DESC, defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC])) |
| result.SetExtension(grpc.PROTOCOL_VERSION, defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])) |
| result.SetExtension(grpc.UNIQUE_ID, defaultIfEmpty(item.UniqueId, evt.Extensions()[grpc.UNIQUE_ID])) |
| result.SetExtension(grpc.SEQ_NUM, defaultIfEmpty(item.SeqNum, evt.Extensions()[grpc.SEQ_NUM])) |
| result.SetExtension(grpc.USERNAME, defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME])) |
| result.SetExtension(grpc.PASSWD, defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD])) |
| result.SetExtension(grpc.TTL, defaultIfEmpty(item.Ttl, evt.Extensions()[grpc.TTL])) |
| result.SetExtension(grpc.PRODUCERGROUP, defaultIfEmpty(bm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP])) |
| if ver == event.CloudEventsVersionV1 { |
| result.SetSpecVersion(event.CloudEventsVersionV1) |
| } else { |
| result.SetSpecVersion(event.CloudEventsVersionV03) |
| } |
| result.SetSubject(topic) |
| msgs = append(msgs, &result) |
| } |
| return msgs, nil |
| } |
| |
| func buildSimpleMessage(evt *v2.Event) (*grpc.SimpleMessageWrapper, error) { |
| ct, err := datacodec.Encode(context.TODO(), evt.DataContentType(), evt) |
| if err != nil { |
| return nil, err |
| } |
| hdr := &pb.RequestHeader{ |
| Env: defaultIfNil("env", evt.Extensions()[grpc.ENV]), |
| Idc: defaultIfNil("idc", evt.Extensions()[grpc.IDC]), |
| Ip: defaultIfNil("127.0.0.1", evt.Extensions()[grpc.IP]), |
| Pid: defaultIfNil("123", evt.Extensions()[grpc.PID]), |
| Sys: defaultIfNil("sys123", evt.Extensions()[grpc.SYS]), |
| Username: defaultIfNil("user", evt.Extensions()[grpc.USERNAME]), |
| Password: defaultIfNil("pass", evt.Extensions()[grpc.PASSWD]), |
| Language: defaultIfNil("JAVA", evt.Extensions()[grpc.LANGUAGE]), |
| ProtocolType: defaultIfNil("protocol", evt.Extensions()[grpc.PROTOCOL_TYPE]), |
| ProtocolDesc: defaultIfNil("protocolDesc", evt.Extensions()[grpc.PROTOCOL_DESC]), |
| ProtocolVersion: defaultIfNil("1.0", evt.Extensions()[grpc.PROTOCOL_VERSION]), |
| } |
| msg := &pb.SimpleMessage{ |
| Header: hdr, |
| Content: string(ct), |
| ProducerGroup: defaultIfNil("producerGroup", evt.Extensions()[grpc.PRODUCERGROUP]), |
| SeqNum: defaultIfNil("", evt.Extensions()[grpc.SEQ_NUM]), |
| UniqueId: defaultIfNil("", evt.Extensions()[grpc.UNIQUE_ID]), |
| Topic: evt.Subject(), |
| Ttl: defaultIfNil("3000", evt.Extensions()[grpc.TTL]), |
| Properties: map[string]string{grpc.CONTENT_TYPE: evt.DataContentType()}, |
| } |
| for k, v := range evt.Extensions() { |
| msg.Properties[k] = defaultIfNil("", v) |
| } |
| |
| return &grpc.SimpleMessageWrapper{SimpleMessage: msg}, nil |
| } |