blob: 8021ffe0ff78593d1b53a67d41d613e7007fc29e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"bytes"
"encoding/json"
"reflect"
"unsafe"
log "github.com/sirupsen/logrus"
"github.com/gogo/protobuf/proto"
"github.com/linkedin/goavro/v2"
)
type SchemaType int
const (
NONE SchemaType = iota //No schema defined
STRING //Simple String encoding with UTF-8
JSON //JSON object encoding and validation
PROTOBUF //Protobuf message encoding and decoding
AVRO //Serialize and deserialize via Avro
BOOLEAN //
INT8 //A 8-byte integer.
INT16 //A 16-byte integer.
INT32 //A 32-byte integer.
INT64 //A 64-byte integer.
FLOAT //A float number.
DOUBLE //A double number
_ //
_ //
_ //
KeyValue //A Schema that contains Key Schema and Value Schema.
BYTES = -1 //A bytes array.
AUTO = -2 //
AutoConsume = -3 //Auto Consume Type.
AutoPublish = -4 // Auto Publish Type.
)
// Encapsulates data around the schema definition
type SchemaInfo struct {
Name string
Schema string
Type SchemaType
Properties map[string]string
}
type Schema interface {
Encode(v interface{}) ([]byte, error)
Decode(data []byte, v interface{}) error
Validate(message []byte) error
GetSchemaInfo() *SchemaInfo
}
type AvroCodec struct {
Codec *goavro.Codec
}
func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec {
schemaDef := &AvroCodec{
Codec: schema,
}
return schemaDef
}
// initAvroCodec returns a Codec used to translate between a byte slice of either
// binary or textual Avro data and native Go data.
func initAvroCodec(codec string) (*goavro.Codec, error) {
return goavro.NewCodec(codec)
}
type JSONSchema struct {
AvroCodec
SchemaInfo
}
func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema {
js := new(JSONSchema)
avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
if err != nil {
log.Fatalf("init codec error:%v", err)
}
schemaDef := NewSchemaDefinition(avroCodec)
js.SchemaInfo.Schema = schemaDef.Codec.Schema()
js.SchemaInfo.Type = JSON
js.SchemaInfo.Properties = properties
js.SchemaInfo.Name = "JSON"
return js
}
func (js *JSONSchema) Encode(data interface{}) ([]byte, error) {
return json.Marshal(data)
}
func (js *JSONSchema) Decode(data []byte, v interface{}) error {
return json.Unmarshal(data, v)
}
func (js *JSONSchema) Validate(message []byte) error {
return js.Decode(message, nil)
}
func (js *JSONSchema) GetSchemaInfo() *SchemaInfo {
return &js.SchemaInfo
}
type ProtoSchema struct {
AvroCodec
SchemaInfo
}
func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema {
ps := new(ProtoSchema)
avroCodec, err := initAvroCodec(protoAvroSchemaDef)
if err != nil {
log.Fatalf("init codec error:%v", err)
}
schemaDef := NewSchemaDefinition(avroCodec)
ps.AvroCodec.Codec = schemaDef.Codec
ps.SchemaInfo.Schema = schemaDef.Codec.Schema()
ps.SchemaInfo.Type = PROTOBUF
ps.SchemaInfo.Properties = properties
ps.SchemaInfo.Name = "Proto"
return ps
}
func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
return proto.Marshal(data.(proto.Message))
}
func (ps *ProtoSchema) Decode(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
func (ps *ProtoSchema) Validate(message []byte) error {
return ps.Decode(message, nil)
}
func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo {
return &ps.SchemaInfo
}
type AvroSchema struct {
AvroCodec
SchemaInfo
}
func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema {
as := new(AvroSchema)
avroCodec, err := initAvroCodec(avroSchemaDef)
if err != nil {
log.Fatalf("init codec error:%v", err)
}
schemaDef := NewSchemaDefinition(avroCodec)
as.AvroCodec.Codec = schemaDef.Codec
as.SchemaInfo.Schema = schemaDef.Codec.Schema()
as.SchemaInfo.Type = AVRO
as.SchemaInfo.Name = "Avro"
as.SchemaInfo.Properties = properties
return as
}
func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
textual, err := json.Marshal(data)
if err != nil {
log.Errorf("serialize data error:%s", err.Error())
return nil, err
}
native, _, err := as.Codec.NativeFromTextual(textual)
if err != nil {
log.Errorf("convert native Go form to binary Avro data error:%s", err.Error())
return nil, err
}
return as.Codec.BinaryFromNative(nil, native)
}
func (as *AvroSchema) Decode(data []byte, v interface{}) error {
native, _, err := as.Codec.NativeFromBinary(data)
if err != nil {
log.Errorf("convert binary Avro data back to native Go form error:%s", err.Error())
return err
}
textual, err := as.Codec.TextualFromNative(nil, native)
if err != nil {
log.Errorf("convert native Go form to textual Avro data error:%s", err.Error())
return err
}
err = json.Unmarshal(textual, v)
if err != nil {
log.Errorf("unSerialize textual error:%s", err.Error())
return err
}
return nil
}
func (as *AvroSchema) Validate(message []byte) error {
return as.Decode(message, nil)
}
func (as *AvroSchema) GetSchemaInfo() *SchemaInfo {
return &as.SchemaInfo
}
type StringSchema struct {
SchemaInfo
}
func NewStringSchema(properties map[string]string) *StringSchema {
strSchema := new(StringSchema)
strSchema.SchemaInfo.Properties = properties
strSchema.SchemaInfo.Name = "String"
strSchema.SchemaInfo.Type = STRING
strSchema.SchemaInfo.Schema = ""
return strSchema
}
func (ss *StringSchema) Encode(v interface{}) ([]byte, error) {
return []byte(v.(string)), nil
}
func (ss *StringSchema) Decode(data []byte, v interface{}) error {
bh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
sh := reflect.StringHeader{
Data: bh.Data,
Len: bh.Len,
}
shPtr := (*string)(unsafe.Pointer(&sh))
reflect.ValueOf(v).Elem().Set(reflect.ValueOf(shPtr))
return nil
}
func (ss *StringSchema) Validate(message []byte) error {
return ss.Decode(message, nil)
}
func (ss *StringSchema) GetSchemaInfo() *SchemaInfo {
return &ss.SchemaInfo
}
type BytesSchema struct {
SchemaInfo
}
func NewBytesSchema(properties map[string]string) *BytesSchema {
bytesSchema := new(BytesSchema)
bytesSchema.SchemaInfo.Properties = properties
bytesSchema.SchemaInfo.Name = "Bytes"
bytesSchema.SchemaInfo.Type = BYTES
bytesSchema.SchemaInfo.Schema = ""
return bytesSchema
}
func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) {
return data.([]byte), nil
}
func (bs *BytesSchema) Decode(data []byte, v interface{}) error {
reflect.ValueOf(v).Elem().Set(reflect.ValueOf(data))
return nil
}
func (bs *BytesSchema) Validate(message []byte) error {
return bs.Decode(message, nil)
}
func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo {
return &bs.SchemaInfo
}
type Int8Schema struct {
SchemaInfo
}
func NewInt8Schema(properties map[string]string) *Int8Schema {
int8Schema := new(Int8Schema)
int8Schema.SchemaInfo.Properties = properties
int8Schema.SchemaInfo.Schema = ""
int8Schema.SchemaInfo.Type = INT8
int8Schema.SchemaInfo.Name = "INT8"
return int8Schema
}
func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) {
var buf bytes.Buffer
err := WriteElements(&buf, value.(int8))
return buf.Bytes(), err
}
func (is8 *Int8Schema) Decode(data []byte, v interface{}) error {
buf := bytes.NewReader(data)
return ReadElements(buf, v)
}
func (is8 *Int8Schema) Validate(message []byte) error {
if len(message) != 1 {
return newError(InvalidMessage, "size of data received by Int8Schema is not 1")
}
return nil
}
func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo {
return &is8.SchemaInfo
}
type Int16Schema struct {
SchemaInfo
}
func NewInt16Schema(properties map[string]string) *Int16Schema {
int16Schema := new(Int16Schema)
int16Schema.SchemaInfo.Properties = properties
int16Schema.SchemaInfo.Name = "INT16"
int16Schema.SchemaInfo.Type = INT16
int16Schema.SchemaInfo.Schema = ""
return int16Schema
}
func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) {
var buf bytes.Buffer
err := WriteElements(&buf, value.(int16))
return buf.Bytes(), err
}
func (is16 *Int16Schema) Decode(data []byte, v interface{}) error {
buf := bytes.NewReader(data)
return ReadElements(buf, v)
}
func (is16 *Int16Schema) Validate(message []byte) error {
if len(message) != 2 {
return newError(InvalidMessage, "size of data received by Int16Schema is not 2")
}
return nil
}
func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo {
return &is16.SchemaInfo
}
type Int32Schema struct {
SchemaInfo
}
func NewInt32Schema(properties map[string]string) *Int32Schema {
int32Schema := new(Int32Schema)
int32Schema.SchemaInfo.Properties = properties
int32Schema.SchemaInfo.Schema = ""
int32Schema.SchemaInfo.Name = "INT32"
int32Schema.SchemaInfo.Type = INT32
return int32Schema
}
func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) {
var buf bytes.Buffer
err := WriteElements(&buf, value.(int32))
return buf.Bytes(), err
}
func (is32 *Int32Schema) Decode(data []byte, v interface{}) error {
buf := bytes.NewReader(data)
return ReadElements(buf, v)
}
func (is32 *Int32Schema) Validate(message []byte) error {
if len(message) != 4 {
return newError(InvalidMessage, "size of data received by Int32Schema is not 4")
}
return nil
}
func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo {
return &is32.SchemaInfo
}
type Int64Schema struct {
SchemaInfo
}
func NewInt64Schema(properties map[string]string) *Int64Schema {
int64Schema := new(Int64Schema)
int64Schema.SchemaInfo.Properties = properties
int64Schema.SchemaInfo.Name = "INT64"
int64Schema.SchemaInfo.Type = INT64
int64Schema.SchemaInfo.Schema = ""
return int64Schema
}
func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) {
var buf bytes.Buffer
err := WriteElements(&buf, value.(int64))
return buf.Bytes(), err
}
func (is64 *Int64Schema) Decode(data []byte, v interface{}) error {
buf := bytes.NewReader(data)
return ReadElements(buf, v)
}
func (is64 *Int64Schema) Validate(message []byte) error {
if len(message) != 8 {
return newError(InvalidMessage, "size of data received by Int64Schema is not 8")
}
return nil
}
func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo {
return &is64.SchemaInfo
}
type FloatSchema struct {
SchemaInfo
}
func NewFloatSchema(properties map[string]string) *FloatSchema {
floatSchema := new(FloatSchema)
floatSchema.SchemaInfo.Properties = properties
floatSchema.SchemaInfo.Type = FLOAT
floatSchema.SchemaInfo.Name = "FLOAT"
floatSchema.SchemaInfo.Schema = ""
return floatSchema
}
func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) {
return BinarySerializer.PutFloat(value)
}
func (fs *FloatSchema) Decode(data []byte, v interface{}) error {
floatValue, err := BinarySerializer.Float32(data)
if err != nil {
log.Errorf("unSerialize float error:%s", err.Error())
return err
}
reflect.ValueOf(v).Elem().Set(reflect.ValueOf(floatValue))
return nil
}
func (fs *FloatSchema) Validate(message []byte) error {
if len(message) != 4 {
return newError(InvalidMessage, "size of data received by FloatSchema is not 4")
}
return nil
}
func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo {
return &fs.SchemaInfo
}
type DoubleSchema struct {
SchemaInfo
}
func NewDoubleSchema(properties map[string]string) *DoubleSchema {
doubleSchema := new(DoubleSchema)
doubleSchema.SchemaInfo.Properties = properties
doubleSchema.SchemaInfo.Type = DOUBLE
doubleSchema.SchemaInfo.Name = "DOUBLE"
doubleSchema.SchemaInfo.Schema = ""
return doubleSchema
}
func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) {
return BinarySerializer.PutDouble(value)
}
func (ds *DoubleSchema) Decode(data []byte, v interface{}) error {
doubleValue, err := BinarySerializer.Float64(data)
if err != nil {
log.Errorf("unSerialize double value error:%s", err.Error())
return err
}
reflect.ValueOf(v).Elem().Set(reflect.ValueOf(doubleValue))
return nil
}
func (ds *DoubleSchema) Validate(message []byte) error {
if len(message) != 8 {
return newError(InvalidMessage, "size of data received by DoubleSchema is not 8")
}
return nil
}
func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo {
return &ds.SchemaInfo
}