blob: 412aae18ca11add1e1863780908a73e68b6a6072 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package binaryserialization
import (
"encoding/binary"
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func CreateGroup(request iggcon.CreateConsumerGroupRequest) []byte {
streamIdBytes := SerializeIdentifier(request.StreamId)
topicIdBytes := SerializeIdentifier(request.TopicId)
offset := len(streamIdBytes) + len(topicIdBytes)
bytes := make([]byte, offset+1+len(request.Name))
copy(bytes[0:len(streamIdBytes)], streamIdBytes)
copy(bytes[len(streamIdBytes):offset], topicIdBytes)
bytes[offset] = byte(len(request.Name))
copy(bytes[offset+1:], request.Name)
return bytes
}
func UpdateOffset(request iggcon.StoreConsumerOffsetRequest) []byte {
hasPartition := byte(0)
var partition uint32 = 0
if request.PartitionId != nil {
hasPartition = 1
partition = *request.PartitionId
}
consumerBytes := SerializeConsumer(request.Consumer)
streamIdBytes := SerializeIdentifier(request.StreamId)
topicIdBytes := SerializeIdentifier(request.TopicId)
// consumer + stream_id + topic_id + hasPartition(1) + partition(4) + offset(8)
bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13)
position := 0
copy(bytes[position:], consumerBytes)
position += len(consumerBytes)
copy(bytes[position:], streamIdBytes)
position += len(streamIdBytes)
copy(bytes[position:], topicIdBytes)
position += len(topicIdBytes)
bytes[position] = hasPartition
binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
binary.LittleEndian.PutUint64(bytes[position+5:position+13], uint64(request.Offset))
return bytes
}
func GetOffset(request iggcon.GetConsumerOffsetRequest) []byte {
hasPartition := byte(0)
var partition uint32 = 0
if request.PartitionId != nil {
hasPartition = 1
partition = *request.PartitionId
}
consumerBytes := SerializeConsumer(request.Consumer)
streamIdBytes := SerializeIdentifier(request.StreamId)
topicIdBytes := SerializeIdentifier(request.TopicId)
// consumer + stream_id + topic_id + hasPartition(1) + partition(4)
bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
position := 0
copy(bytes[position:], consumerBytes)
position += len(consumerBytes)
copy(bytes[position:], streamIdBytes)
position += len(streamIdBytes)
copy(bytes[position:], topicIdBytes)
position += len(topicIdBytes)
bytes[position] = hasPartition
binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
return bytes
}
func DeleteOffset(request iggcon.DeleteConsumerOffsetRequest) []byte {
hasPartition := byte(0)
var partition uint32 = 0
if request.PartitionId != nil {
hasPartition = 1
partition = *request.PartitionId
}
consumerBytes := SerializeConsumer(request.Consumer)
streamIdBytes := SerializeIdentifier(request.StreamId)
topicIdBytes := SerializeIdentifier(request.TopicId)
// consumer + stream_id + topic_id + hasPartition(1) + partition(4)
bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
position := 0
copy(bytes[position:], consumerBytes)
position += len(consumerBytes)
copy(bytes[position:], streamIdBytes)
position += len(streamIdBytes)
copy(bytes[position:], topicIdBytes)
position += len(topicIdBytes)
bytes[position] = hasPartition
binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
return bytes
}
func CreatePartitions(request iggcon.CreatePartitionsRequest) []byte {
streamIdBytes := SerializeIdentifier(request.StreamId)
topicIdBytes := SerializeIdentifier(request.TopicId)
bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
position := 0
copy(bytes[position:], streamIdBytes)
position += len(streamIdBytes)
copy(bytes[position:], topicIdBytes)
position += len(topicIdBytes)
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(request.PartitionsCount))
return bytes
}
func DeletePartitions(request iggcon.DeletePartitionsRequest) []byte {
streamIdBytes := SerializeIdentifier(request.StreamId)
topicIdBytes := SerializeIdentifier(request.TopicId)
bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
position := 0
copy(bytes[position:], streamIdBytes)
position += len(streamIdBytes)
copy(bytes[position:], topicIdBytes)
position += len(topicIdBytes)
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(request.PartitionsCount))
return bytes
}
//USERS
func SerializeCreateUserRequest(request iggcon.CreateUserRequest) []byte {
capacity := 4 + len(request.Username) + len(request.Password)
if request.Permissions != nil {
capacity += 1 + 4 + CalculatePermissionsSize(request.Permissions)
}
bytes := make([]byte, capacity)
position := 0
bytes[position] = byte(len(request.Username))
position += 1
copy(bytes[position:position+len(request.Username)], []byte(request.Username))
position += len(request.Username)
bytes[position] = byte(len(request.Password))
position += 1
copy(bytes[position:position+len(request.Password)], []byte(request.Password))
position += len(request.Password)
statusByte := byte(0)
switch request.Status {
case iggcon.Active:
statusByte = byte(1)
case iggcon.Inactive:
statusByte = byte(2)
}
bytes[position] = statusByte
position += 1
if request.Permissions != nil {
bytes[position] = byte(1)
position += 1
permissionsBytes := GetBytesFromPermissions(request.Permissions)
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(len(permissionsBytes)))
position += 4
copy(bytes[position:position+len(permissionsBytes)], permissionsBytes)
} else {
bytes[position] = byte(0)
}
return bytes
}
func GetBytesFromPermissions(data *iggcon.Permissions) []byte {
size := CalculatePermissionsSize(data)
bytes := make([]byte, size)
bytes[0] = boolToByte(data.Global.ManageServers)
bytes[1] = boolToByte(data.Global.ReadServers)
bytes[2] = boolToByte(data.Global.ManageUsers)
bytes[3] = boolToByte(data.Global.ReadUsers)
bytes[4] = boolToByte(data.Global.ManageStreams)
bytes[5] = boolToByte(data.Global.ReadStreams)
bytes[6] = boolToByte(data.Global.ManageTopics)
bytes[7] = boolToByte(data.Global.ReadTopics)
bytes[8] = boolToByte(data.Global.PollMessages)
bytes[9] = boolToByte(data.Global.SendMessages)
position := 10
if data.Streams != nil {
bytes[position] = byte(1)
position += 1
for streamID, stream := range data.Streams {
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(streamID))
position += 4
bytes[position] = boolToByte(stream.ManageStream)
bytes[position+1] = boolToByte(stream.ReadStream)
bytes[position+2] = boolToByte(stream.ManageTopics)
bytes[position+3] = boolToByte(stream.ReadTopics)
bytes[position+4] = boolToByte(stream.PollMessages)
bytes[position+5] = boolToByte(stream.SendMessages)
position += 6
if stream.Topics != nil {
bytes[position] = byte(1)
position += 1
for topicID, topic := range stream.Topics {
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(topicID))
position += 4
bytes[position] = boolToByte(topic.ManageTopic)
bytes[position+1] = boolToByte(topic.ReadTopic)
bytes[position+2] = boolToByte(topic.PollMessages)
bytes[position+3] = boolToByte(topic.SendMessages)
position += 4
bytes[position] = byte(0)
position += 1
}
} else {
bytes[position] = byte(0)
position += 1
}
}
} else {
bytes[0] = byte(0)
}
return bytes
}
func CalculatePermissionsSize(data *iggcon.Permissions) int {
size := 10
if data.Streams != nil {
size += 1
for _, stream := range data.Streams {
size += 4
size += 6
size += 1
if stream.Topics != nil {
size += 1
size += len(stream.Topics) * 9
} else {
size += 1
}
}
} else {
size += 1
}
return size
}
func boolToByte(b bool) byte {
if b {
return 1
}
return 0
}
func SerializeUpdateUser(request iggcon.UpdateUserRequest) []byte {
userIdBytes := SerializeIdentifier(request.UserID)
length := len(userIdBytes)
if request.Username == nil {
request.Username = new(string)
}
username := *request.Username
if len(username) != 0 {
length += 2 + len(username)
}
if request.Status != nil {
length += 2
}
bytes := make([]byte, length+1)
position := 0
copy(bytes[position:position+len(userIdBytes)], userIdBytes)
position += len(userIdBytes)
if len(username) != 0 {
bytes[position] = 1
position++
bytes[position] = byte(len(username))
position++
copy(bytes[position:position+len(username)], username)
position += len(username)
} else {
bytes[position] = 0
position++
}
if request.Status != nil {
bytes[position] = 1
position++
statusByte := byte(0)
switch *request.Status {
case iggcon.Active:
statusByte = 1
case iggcon.Inactive:
statusByte = 2
}
bytes[position] = statusByte
} else {
bytes[position] = 0
}
return bytes
}
func SerializeChangePasswordRequest(request iggcon.ChangePasswordRequest) []byte {
userIdBytes := SerializeIdentifier(request.UserID)
length := len(userIdBytes) + len(request.CurrentPassword) + len(request.NewPassword) + 2
bytes := make([]byte, length)
position := 0
copy(bytes[position:position+len(userIdBytes)], userIdBytes)
position += len(userIdBytes)
bytes[position] = byte(len(request.CurrentPassword))
position++
copy(bytes[position:position+len(request.CurrentPassword)], []byte(request.CurrentPassword))
position += len(request.CurrentPassword)
bytes[position] = byte(len(request.NewPassword))
position++
copy(bytes[position:position+len(request.NewPassword)], []byte(request.NewPassword))
return bytes
}
func SerializeUpdateUserPermissionsRequest(request iggcon.UpdatePermissionsRequest) []byte {
userIdBytes := SerializeIdentifier(request.UserID)
length := len(userIdBytes)
if request.Permissions != nil {
length += 1 + 4 + CalculatePermissionsSize(request.Permissions)
}
bytes := make([]byte, length)
position := 0
copy(bytes[position:position+len(userIdBytes)], userIdBytes)
position += len(userIdBytes)
if request.Permissions != nil {
bytes[position] = 1
position++
permissionsBytes := GetBytesFromPermissions(request.Permissions)
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(len(permissionsBytes)))
position += 4
copy(bytes[position:position+len(permissionsBytes)], permissionsBytes)
} else {
bytes[position] = 0
}
return bytes
}
func SerializeUint32(value uint32) []byte {
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, value)
return bytes
}
func SerializeLoginWithPersonalAccessToken(request iggcon.LoginWithPersonalAccessTokenRequest) []byte {
length := 1 + len(request.Token)
bytes := make([]byte, length)
bytes[0] = byte(len(request.Token))
copy(bytes[1:], []byte(request.Token))
return bytes
}
func SerializeDeletePersonalAccessToken(request iggcon.DeletePersonalAccessTokenRequest) []byte {
length := 1 + len(request.Name)
bytes := make([]byte, length)
bytes[0] = byte(len(request.Name))
copy(bytes[1:], []byte(request.Name))
return bytes
}
func SerializeCreatePersonalAccessToken(request iggcon.CreatePersonalAccessTokenRequest) []byte {
length := 1 + len(request.Name) + 8
bytes := make([]byte, length)
bytes[0] = byte(len(request.Name))
copy(bytes[1:], []byte(request.Name))
binary.LittleEndian.PutUint32(bytes[len(bytes)-4:], request.Expiry)
return bytes
}