blob: ddf2819a03a0ef56e29a21e5501d2be9850cd950 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package _default
import (
// DefaultCodecRequirements adds required methods to MessageCodec that are needed when using DefaultCodec
type DefaultCodecRequirements interface {
GetCodec() spi.MessageCodec
Send(message spi.Message) error
Receive() (spi.Message, error)
// DefaultCodec is a default codec implementation which has so sensitive defaults for message handling and a built-in worker
type DefaultCodec interface {
// NewDefaultCodec is the factory for a DefaultCodec
func NewDefaultCodec(requirements DefaultCodecRequirements, transportInstance transports.TransportInstance, options ...options.WithOption) DefaultCodec {
return buildDefaultCodec(requirements, transportInstance, options...)
type DefaultExpectation struct {
Context context.Context
Expiration time.Time
AcceptsMessage spi.AcceptsMessage
HandleMessage spi.HandleMessage
HandleError spi.HandleError
type CustomMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
func WithCustomMessageHandler(customMessageHandler CustomMessageHandler) options.WithOption {
return withCustomMessageHandler{customMessageHandler: customMessageHandler}
// Internal section
type withCustomMessageHandler struct {
customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
type defaultCodec struct {
transportInstance transports.TransportInstance
defaultIncomingMessageChannel chan spi.Message
expectations []spi.Expectation
running bool
customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
log zerolog.Logger
func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transportInstance transports.TransportInstance, _options ...options.WithOption) DefaultCodec {
var customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
var logger = options.ExtractCustomLogger(_options...)
for _, option := range _options {
switch option := option.(type) {
case withCustomMessageHandler:
customMessageHandler = option.customMessageHandler
return &defaultCodec{
DefaultCodecRequirements: defaultCodecRequirements,
transportInstance: transportInstance,
defaultIncomingMessageChannel: make(chan spi.Message),
expectations: []spi.Expectation{},
running: false,
customMessageHandling: customMessageHandler,
log: logger,
// Internal section
func (m *DefaultExpectation) GetContext() context.Context {
return m.Context
func (m *DefaultExpectation) GetExpiration() time.Time {
return m.Expiration
func (m *DefaultExpectation) GetAcceptsMessage() spi.AcceptsMessage {
return m.AcceptsMessage
func (m *DefaultExpectation) GetHandleMessage() spi.HandleMessage {
return m.HandleMessage
func (m *DefaultExpectation) GetHandleError() spi.HandleError {
return m.HandleError
func (m *DefaultExpectation) String() string {
return fmt.Sprintf("Expectation(expires at %v)", m.Expiration)
func (m *defaultCodec) GetTransportInstance() transports.TransportInstance {
return m.transportInstance
func (m *defaultCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
return m.defaultIncomingMessageChannel
func (m *defaultCodec) Connect() error {
return m.ConnectWithContext(context.Background())
func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
if !m.transportInstance.IsConnected() {
if err := m.transportInstance.ConnectWithContext(ctx); err != nil {
return err
} else {
m.log.Info().Msg("Transport instance already connected")
if !m.running {
m.log.Debug().Msg("Message codec currently not running, starting worker now")
go m.Work(m.DefaultCodecRequirements)
m.running = true
return nil
func (m *defaultCodec) Disconnect() error {
m.running = false
if m.transportInstance == nil {
// TODO: check if we move that case to the constructor
return nil
return m.transportInstance.Close()
func (m *defaultCodec) IsRunning() bool {
return m.running
func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
expectation := &DefaultExpectation{
Context: ctx,
Expiration: time.Now().Add(ttl),
AcceptsMessage: acceptsMessage,
HandleMessage: handleMessage,
HandleError: handleError,
m.expectations = append(m.expectations, expectation)
return nil
func (m *defaultCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "Not sending message as context is aborted")
m.log.Trace().Msg("Sending request")
// Send the actual message
err := m.Send(message)
if err != nil {
return errors.Wrap(err, "Error sending the request")
return m.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
func (m *defaultCodec) TimeoutExpectations(now time.Time) {
for i := 0; i < len(m.expectations); i++ {
expectation := m.expectations[i]
// Check if this expectation has expired.
if now.After(expectation.GetExpiration()) {
// Remove this expectation from the list.
m.expectations = append(m.expectations[:i], m.expectations[i+1:]...)
// Call the error handler.
go func(expectation spi.Expectation) {
if err := expectation.GetHandleError()(utils.NewTimeoutError(now.Sub(expectation.GetExpiration()))); err != nil {
m.log.Error().Err(err).Msg("Got an error handling error on expectation")
if err := expectation.GetContext().Err(); err != nil {
// Remove this expectation from the list.
m.expectations = append(m.expectations[:i], m.expectations[i+1:]...)
go func(expectation spi.Expectation) {
if err := expectation.GetHandleError()(err); err != nil {
m.log.Error().Err(err).Msg("Got an error handling error on expectation")
func (m *defaultCodec) HandleMessages(message spi.Message) bool {
messageHandled := false
m.log.Trace().Msgf("Current number of expectations: %d", len(m.expectations))
for index, expectation := range m.expectations {
// Check if the current message matches the expectations
// If it does, let it handle the message.
if accepts := expectation.GetAcceptsMessage()(message); accepts {
m.log.Debug().Stringer("expectation", expectation).Msg("accepts message")
// TODO: decouple from worker thread
err := expectation.GetHandleMessage()(message)
if err != nil {
// Pass the error to the error handler.
// TODO: decouple from worker thread
err := expectation.GetHandleError()(err)
if err != nil {
m.log.Error().Err(err).Msg("Got an error handling error on expectation")
messageHandled = true
// If this is the last element of the list remove it differently than if it's before that
if (index + 1) == len(m.expectations) {
m.expectations = m.expectations[:index]
} else if (index + 1) < len(m.expectations) {
m.expectations = append(m.expectations[:index], m.expectations[index+1:]...)
return messageHandled
func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
workerLog := m.log.With().Logger()
if !config.TraceDefaultMessageCodecWorker {
workerLog = zerolog.Nop()
defer func(workerLog zerolog.Logger) {
if err := recover(); err != nil {
// TODO: If this is an error, cast it to an error and log it with "Err(err)"
m.log.Error().Msgf("recovered from: %#v at %s", err, string(debug.Stack()))
if m.running {
workerLog.Warn().Msg("Keep running")
} else {
workerLog.Info().Msg("Worker terminated")
// Start an endless loop
for m.running {
// Check for any expired expectations.
// (Doing this outside the loop lets us expire expectations even if no input is coming in)
now := time.Now()
// Guard against empty expectations
if len(m.expectations) <= 0 && m.customMessageHandling == nil {
workerLog.Trace().Msg("no available expectations")
// Sleep for 10ms
time.Sleep(time.Millisecond * 10)
continue mainLoop
workerLog.Trace().Msg("Receiving message")
// Check for incoming messages.
message, err := m.Receive()
if err != nil {
workerLog.Error().Err(err).Msg("got an error reading from transport")
time.Sleep(time.Millisecond * 10)
continue mainLoop
if message == nil {
workerLog.Trace().Msg("Not enough data yet")
// Sleep for 10ms before checking again, in order to not
// consume 100% CPU Power.
time.Sleep(time.Millisecond * 10)
continue mainLoop
if m.customMessageHandling != nil {
workerLog.Trace().Msg("Executing custom handling")
if m.customMessageHandling(codec, message) {
workerLog.Trace().Msg("Custom handling handled the message")
continue mainLoop
workerLog.Trace().Msg("Custom handling didn't handle the message")
workerLog.Trace().Msg("Handle message")
// Go through all expectations
messageHandled := m.HandleMessages(message)
// If the message has not been handled and a default handler is provided, call this ...
if !messageHandled {
workerLog.Trace().Msg("Message was not handled")
m.passToDefaultIncomingMessageChannel(workerLog, message)
func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Logger, message spi.Message) {
timeout := time.NewTimer(time.Millisecond * 40)
defer utils.CleanupTimer(timeout)
select {
case m.defaultIncomingMessageChannel <- message:
case <-timeout.C:
workerLog.Warn().Msgf("Message discarded\n%s", message)