/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package opcua

import (
	"bytes"
	"context"
	"crypto/sha1"
	"encoding/binary"
	"math"
	"math/rand"
	"net"
	"net/url"
	"regexp"
	"slices"
	"sync"
	"sync/atomic"
	"time"

	"github.com/pkg/errors"
	"github.com/rs/zerolog"

	"github.com/apache/plc4x/plc4go/pkg/api"
	readWriteModel "github.com/apache/plc4x/plc4go/protocols/opcua/readwrite/model"
	"github.com/apache/plc4x/plc4go/spi"
	"github.com/apache/plc4x/plc4go/spi/utils"
)

const (
	VERSION                       = uint32(0)
	DEFAULT_MAX_CHUNK_COUNT       = 64
	DEFAULT_MAX_MESSAGE_SIZE      = uint32(2097152)
	DEFAULT_RECEIVE_BUFFER_SIZE   = uint32(65535)
	DEFAULT_SEND_BUFFER_SIZE      = uint32(65535)
	REQUEST_TIMEOUT               = 10 * time.Second
	REQUEST_TIMEOUT_LONG          = 10000
	PASSWORD_ENCRYPTION_ALGORITHM = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
	EPOCH_OFFSET                  = 116444736000000000 //Offset between OPC UA epoch time and linux epoch time.
)

var (
	SECURITY_POLICY_NONE = readWriteModel.NewPascalString(utils.ToPtr("http://opcfoundation.org/UA/SecurityPolicy#None"))
	NULL_STRING          = readWriteModel.NewPascalString(nil)
	NULL_BYTE_STRING     = readWriteModel.NewPascalByteString(-1, nil)
	NULL_EXPANDED_NODEID = readWriteModel.NewExpandedNodeId(false,
		false,
		readWriteModel.NewNodeIdTwoByte(0),
		nil,
		nil,
	)
	BINARY_ENCODING_MASK  = readWriteModel.NewExtensionObjectEncodingMask(false, false, true)
	NULL_EXTENSION_OBJECT = readWriteModel.NewNullExtensionObjectWithMask(NULL_EXPANDED_NODEID,
		readWriteModel.NewExtensionObjectEncodingMask(false, false, false),
	) // Body

	INET_ADDRESS_PATTERN = regexp.MustCompile(`(.(?P<transportCode>tcp))?://(?P<transportHost>[\w.-]+)(:(?P<transportPort>\d*))?`)

	URI_PATTERN                 = regexp.MustCompile(`^(?P<protocolCode>opc)` + INET_ADDRESS_PATTERN.String() + `(?P<transportEndpoint>[\w/=]*)[?]?`)
	APPLICATION_URI             = readWriteModel.NewPascalString(utils.ToPtr("urn:apache:plc4x:client"))
	PRODUCT_URI                 = readWriteModel.NewPascalString(utils.ToPtr("urn:apache:plc4x:client"))
	APPLICATION_TEXT            = readWriteModel.NewPascalString(utils.ToPtr("OPCUA client for the Apache PLC4X:PLC4J project"))
	DEFAULT_CONNECTION_LIFETIME = uint32(36000000)
)

//go:generate go tool plc4xGenerator -type=SecureChannel
type SecureChannel struct {
	sessionName               string
	clientNonce               []byte
	requestHandleGenerator    atomic.Uint32
	policyId                  readWriteModel.PascalString
	tokenType                 readWriteModel.UserTokenType `stringer:"true"`
	discovery                 bool
	certFile                  string
	keyStoreFile              string
	ckp                       CertificateKeyPair
	endpoint                  readWriteModel.PascalString
	username                  string
	password                  string
	securityPolicy            string
	publicCertificate         readWriteModel.PascalByteString
	thumbprint                readWriteModel.PascalByteString
	isEncrypted               bool
	senderCertificate         []byte
	senderNonce               []byte
	certificateThumbprint     readWriteModel.PascalByteString
	checkedEndpoints          bool
	encryptionHandler         *EncryptionHandler
	configuration             Configuration `stringer:"true"`
	channelId                 atomic.Uint32
	tokenId                   atomic.Uint32
	authenticationToken       readWriteModel.NodeIdTypeDefinition
	codec                     *MessageCodec
	channelTransactionManager *SecureChannelTransactionManager
	lifetime                  uint32
	keepAliveStateChange      sync.Mutex
	keepAliveIndicator        atomic.Bool
	keepAliveWg               sync.WaitGroup
	sendBufferSize            int
	maxMessageSize            int
	endpoints                 []string
	senderSequenceNumber      atomic.Int32

	wg sync.WaitGroup // use to track spawned go routines

	log zerolog.Logger
}

func NewSecureChannel(log zerolog.Logger, ctx DriverContext, configuration Configuration) *SecureChannel {
	s := &SecureChannel{
		configuration:             configuration,
		endpoint:                  readWriteModel.NewPascalString(&configuration.Endpoint),
		username:                  configuration.Username,
		password:                  configuration.Password,
		securityPolicy:            "http://opcfoundation.org/UA/SecurityPolicy#" + configuration.SecurityPolicy,
		sessionName:               "UaSession:" + *APPLICATION_TEXT.GetStringValue() + ":" + utils.RandomString(20),
		authenticationToken:       readWriteModel.NewNodeIdTwoByte(0),
		clientNonce:               []byte(utils.RandomString(40)),
		keyStoreFile:              configuration.KeyStoreFile,
		channelTransactionManager: NewSecureChannelTransactionManager(log),
		lifetime:                  DEFAULT_CONNECTION_LIFETIME,
		log:                       log,
	}
	s.requestHandleGenerator.Store(1)
	s.channelId.Store(1)
	s.tokenId.Store(1)
	ckp := configuration.Ckp
	if configuration.SecurityPolicy == "Basic256Sha256" {
		//Sender Certificate gets populated during the 'discover' phase when encryption is enabled.
		s.senderCertificate = configuration.SenderCertificate
		s.encryptionHandler = NewEncryptionHandler(s.log, ckp, s.senderCertificate, configuration.SecurityPolicy)
		certificate := ckp.getCertificate()
		s.publicCertificate = readWriteModel.NewPascalByteString(int32(len(certificate.Raw)), certificate.Raw)
		s.isEncrypted = true

		s.thumbprint = configuration.Thumbprint
	} else {
		s.encryptionHandler = NewEncryptionHandler(s.log, ckp, s.senderCertificate, configuration.SecurityPolicy)
		s.publicCertificate = NULL_BYTE_STRING
		s.thumbprint = NULL_BYTE_STRING
		s.isEncrypted = false
	}

	// Generate a list of endpoints we can use.
	{
		var err error
		address, err := url.Parse("none://" + configuration.Host)
		if err == nil {
			if names, lookupErr := net.LookupHost(address.Host); lookupErr == nil {
				s.endpoints = append(s.endpoints, names[rand.Intn(len(names))])
				s.endpoints = append(s.endpoints, address.Host)
				//s.endpoints = append(s.endpoints, address.Host)//TODO: not sure if golang can do
			} else {
				err = lookupErr
			}
		}
		if err != nil {
			s.log.Warn().Err(err).Msg("Unable to resolve host name. Using original host from connection string which may cause issues connecting to server")
			s.endpoints = append(s.endpoints, address.Host)
		}
	}

	s.channelId.Store(1)
	return s
}

func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDispatcher func(err error), consumer func(opcuaResponse []byte), buffer utils.WriteBufferByteBased) {
	transactionId := s.channelTransactionManager.getTransactionIdentifier()

	//TODO: We need to split large messages up into chunks if it is larger than the sendBufferSize
	//      This value is negotiated when opening a channel

	messageRequest := readWriteModel.NewOpcuaMessageRequest(
		readWriteModel.ChunkType_FINAL,
		readWriteModel.NewSecurityHeader(
			s.channelId.Load(),
			s.tokenId.Load(),
		),
		readWriteModel.NewBinaryPayload(
			readWriteModel.NewSequenceHeader(transactionId, transactionId),
			buffer.GetBytes(),
		),
	)

	var apu readWriteModel.OpcuaAPU
	if s.isEncrypted {
		message, err := s.encryptionHandler.encodeMessage(ctx, messageRequest, buffer.GetBytes())
		if err != nil {
			errorDispatcher(err)
			return
		}
		apu, err = readWriteModel.OpcuaAPUParse(ctx, message, false, true)
		if err != nil {
			errorDispatcher(err)
			return
		}
	} else {
		apu = readWriteModel.NewOpcuaAPU(messageRequest)
	}

	requestConsumer := func(transactionId int32) {
		var messageBuffer []byte
		if err := codec.SendRequest(ctx, apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				if decodedOpcuaAPU, err := s.encryptionHandler.decodeMessage(ctx, opcuaAPU); err != nil {
					s.log.Debug().Err(err).Msg("error decoding")
					return false
				} else {
					opcuaAPU = decodedOpcuaAPU.(readWriteModel.OpcuaAPU)
				}
				messagePDU := opcuaAPU.GetMessage()
				s.log.Trace().Stringer("messagePDU", messagePDU).Msg("looking at messagePDU")
				opcuaResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				if requestId := opcuaResponse.GetMessage().GetSequenceHeader().GetRequestId(); requestId != transactionId {
					s.log.Debug().Int32("requestId", requestId).Int32("transactionId", transactionId).Msg("Not relevant")
					return false
				} else {
					messageBuffer = opcuaResponse.(readWriteModel.BinaryPayload).GetPayload()
					if !(s.senderSequenceNumber.Add(1) == (opcuaResponse.GetMessage().GetSequenceHeader().GetSequenceNumber())) {
						s.log.Error().
							Int32("senderSequenceNumber", s.senderSequenceNumber.Load()).
							Int32("responseSequenceNumber", opcuaResponse.GetMessage().GetSequenceHeader().GetSequenceNumber()).
							Msg("Sequence number isn't as expected, we might have missed a packet. - senderSequenceNumber != responseSequenceNumber")
						errorDispatcher(errors.New("unexpected sequence number"))
					}
				}
				return true
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				opcuaAPU, _ = s.encryptionHandler.decodeMessage(ctx, opcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				s.log.Trace().Stringer("messagePDU", messagePDU).Msg("looking at messagePDU")
				opcuaResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
				if opcuaResponse.GetChunk() == (readWriteModel.ChunkType_FINAL) {
					s.tokenId.Store(opcuaResponse.GetSecurityHeader().GetSecureTokenId())
					s.channelId.Store(opcuaResponse.GetSecurityHeader().GetSecureChannelId())

					consumer(messageBuffer)
				} else {
					s.log.Warn().Stringer("chunk", opcuaResponse.GetChunk()).Msg("Message discarded")
				}
				return nil
			},
			func(err error) error {
				errorDispatcher(err)
				return nil
			},
			REQUEST_TIMEOUT); err != nil {
			errorDispatcher(err)
		}
	}

	s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting Transaction to TransactionManager")
	if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) onConnect(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult) {
	s.log.Trace().Msg("on connect")
	// Only the TCP transport supports login.
	s.log.Debug().Msg("Opcua Driver running in ACTIVE mode.")
	s.codec = connection.messageCodec // TODO: why would we need to set that?

	hello := readWriteModel.NewOpcuaHelloRequest(
		readWriteModel.ChunkType_FINAL,
		VERSION,
		readWriteModel.NewOpcuaProtocolLimits(
			DEFAULT_RECEIVE_BUFFER_SIZE,
			DEFAULT_SEND_BUFFER_SIZE,
			DEFAULT_MAX_MESSAGE_SIZE,
			DEFAULT_MAX_CHUNK_COUNT,
		),
		s.endpoint,
	)

	requestConsumer := func(transactionId int32) {
		s.log.Trace().Int32("transactionId", transactionId).Msg("request consumer called")
		if err := s.codec.SendRequest(
			ctx,
			hello,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				_, ok = messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return true
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
				go s.onConnectOpenSecureChannel(ctx, connection, ch, opcuaAcknowledgeResponse)
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				connection.fireConnectionError(err, ch)
				return nil
			},
			REQUEST_TIMEOUT); err != nil {
			s.log.Debug().Err(err).Msg("error sending")
		}
	}
	if err := s.channelTransactionManager.submit(requestConsumer, s.channelTransactionManager.getTransactionIdentifier()); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult, response readWriteModel.OpcuaAcknowledgeResponse) {
	transactionId := s.channelTransactionManager.getTransactionIdentifier()

	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		0, //RequestHandle
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	var openSecureChannelRequest readWriteModel.OpenSecureChannelRequest
	if s.isEncrypted {
		openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
			requestHeader,
			VERSION,
			readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
			readWriteModel.MessageSecurityMode_messageSecurityModeSignAndEncrypt,
			readWriteModel.NewPascalByteString(int32(len(s.clientNonce)), s.clientNonce),
			s.lifetime)
	} else {
		openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
			requestHeader,
			VERSION,
			readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
			readWriteModel.MessageSecurityMode_messageSecurityModeNone,
			NULL_BYTE_STRING,
			s.lifetime)
	}

	identifier := openSecureChannelRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(
		false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil,
	)

	extObject := readWriteModel.NewRootExtensionObject(
		expandedNodeId,
		openSecureChannelRequest,
	)

	buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
	if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
		s.log.Debug().Err(err).Msg("error serializing")
		connection.fireConnectionError(err, ch)
		return
	}

	openRequest := readWriteModel.NewOpcuaOpenRequest(
		readWriteModel.ChunkType_FINAL,
		readWriteModel.NewOpenChannelMessageRequest(
			0,
			readWriteModel.NewPascalString(&s.securityPolicy),
			s.publicCertificate,
			s.thumbprint),
		readWriteModel.NewBinaryPayload(
			readWriteModel.NewSequenceHeader(transactionId, transactionId),
			buffer.GetBytes(),
		),
	)

	var apu readWriteModel.OpcuaAPU

	if s.isEncrypted {
		message, err := s.encryptionHandler.encodeMessage(ctx, openRequest, buffer.GetBytes())
		if err != nil {
			s.log.Debug().Err(err).Msg("error encoding")
			connection.fireConnectionError(err, ch)
			return
		}
		apu, err = readWriteModel.OpcuaAPUParse(ctx, message, false, true)
		if err != nil {
			s.log.Debug().Err(err).Msg("error parsing")
			connection.fireConnectionError(err, ch)
			return
		}
	} else {
		apu = readWriteModel.NewOpcuaAPU(openRequest)
	}

	requestConsumer := func(transactionId int32) {
		if err := s.codec.SendRequest(
			ctx,
			apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				openResponse, ok := messagePDU.(readWriteModel.OpcuaOpenResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				opcuaOpenResponse := messagePDU.(readWriteModel.OpcuaOpenResponse)
				readBuffer := utils.NewReadBufferByteBased(opcuaOpenResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
				extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
				if err != nil {
					return errors.Wrap(err, "error parsing")
				}
				//Store the initial sequence number from the server. there's no requirement for the server and client to use the same starting number.
				s.senderSequenceNumber.Store(opcuaOpenResponse.GetMessage().GetSequenceHeader().GetSequenceNumber())

				if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
					statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
					statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
					s.log.Error().
						Uint32("statusCode", statusCode).
						Stringer("statusCodeByValue", statusCodeByValue).
						Msg("Failed to connect to opc ua server for the following reason")
					connection.fireConnectionError(errors.New("service fault received"), ch)
					return nil
				}
				s.log.Debug().Msg("Got Secure Response Connection Response")
				openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
				s.tokenId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())
				s.channelId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId())
				go s.onConnectCreateSessionRequest(ctx, connection, ch)
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				connection.fireConnectionError(err, ch)
				return nil
			},
			REQUEST_TIMEOUT,
		); err != nil {
			s.log.Debug().Err(err).Msg("a error")
			connection.fireConnectionError(err, ch)
		}
	}
	s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting OpenSecureChannel with id")
	if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
		connection.fireConnectionError(err, ch)
	}
}

func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult) {
	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		0,
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	applicationName := readWriteModel.NewLocalizedText(
		true,
		true,
		readWriteModel.NewPascalString(utils.ToPtr("en")),
		APPLICATION_TEXT)

	var discoveryUrls []readWriteModel.PascalString

	clientDescription := readWriteModel.NewApplicationDescription(APPLICATION_URI,
		PRODUCT_URI,
		applicationName,
		readWriteModel.ApplicationType_applicationTypeClient,
		NULL_STRING,
		NULL_STRING,
		discoveryUrls)

	createSessionRequest := readWriteModel.NewCreateSessionRequest(
		requestHeader,
		clientDescription,
		NULL_STRING,
		s.endpoint,
		readWriteModel.NewPascalString(&s.sessionName),
		readWriteModel.NewPascalByteString(int32(len(s.clientNonce)), s.clientNonce),
		NULL_BYTE_STRING,
		120000,
		0,
	)

	identifier := createSessionRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(
		false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil)

	extObject := readWriteModel.NewRootExtensionObject(
		expandedNodeId,
		createSessionRequest,
	)

	buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
	if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
		s.log.Debug().Err(err).Msg("error serializing")
		connection.fireConnectionError(err, ch)
		return
	}

	consumer := func(opcuaResponse []byte) {
		extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
		if err != nil {
			s.log.Error().Err(err).Msg("error parsing")
			connection.fireConnectionError(err, ch)
			return
		}
		s.log.Trace().Stringer("extensionObject", extensionObject).Msg("looking at message")
		if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
			statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
			statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
			s.log.Error().
				Uint32("statusCode", statusCode).
				Stringer("statusCodeByValue", statusCodeByValue).
				Msg("Failed to connect to opc ua server for the following reason")
			connection.fireConnectionError(errors.New("service fault received"), ch)
			return
		}
		s.log.Debug().Msg("Got Create Session Response Connection Response")

		unknownExtensionObject := extensionObject.GetBody()
		if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponse); ok {
			s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId()

			go s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, responseMessage)
		} else {
			serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
			header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
			s.log.Error().
				Stringer("serviceResult", header.GetServiceResult()).
				Msg("Subscription ServiceFault returned from server with error code, '%s'")
		}
	}

	errorDispatcher := func(err error) {
		s.log.Error().Err(err).Msg("Error while waiting for subscription response")
		connection.fireConnectionError(err, ch)
	}

	s.submit(ctx, connection.messageCodec, errorDispatcher, consumer, buffer)
}

func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, connection *Connection, ch chan plc4go.PlcConnectionConnectResult, opcuaMessageResponse readWriteModel.CreateSessionResponse, sessionResponse readWriteModel.CreateSessionResponse) {
	s.senderCertificate = sessionResponse.GetServerCertificate().GetStringValue()
	certificate, err := s.encryptionHandler.getCertificateX509(s.senderCertificate)
	if err != nil {
		s.log.Error().Err(err).Msg("error getting certificate")
		connection.fireConnectionError(err, ch)
		return
	}
	s.log.Debug().Interface("senderCertificate", certificate).Msg("working with senderCertificate")
	s.encryptionHandler.setServerCertificate(certificate)
	s.senderNonce = sessionResponse.GetServerNonce().GetStringValue()
	endpoints := make([]string, 3)
	if address, err := url.Parse(s.configuration.Host); err == nil {
		if names, err := net.LookupAddr(address.Host); err != nil {
			endpoints[0] = "opc.tcp://" + names[rand.Intn(len(names))] + ":" + s.configuration.Port + s.configuration.TransportEndpoint
		}
		endpoints[1] = "opc.tcp://" + address.Hostname() + ":" + s.configuration.Port + s.configuration.TransportEndpoint
		//endpoints[2] = "opc.tcp://" + address.getCanonicalHostName() + ":" + s.configuration.getPort() + s.configuration.transportEndpoint// TODO: not sure how to get that in golang
	} else {
		s.log.Debug().Err(err).Msg("error parsing host")
	}

	s.selectEndpoint(sessionResponse)

	if s.policyId == nil {
		s.log.Error().Msg("Unable to find endpoint - " + endpoints[1])
		connection.fireConnectionError(err, ch)
		return
	}

	userIdentityToken := s.getIdentityToken(s.tokenType, s.policyId.GetStringValue())

	requestHandle := s.getRequestHandle()

	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		requestHandle,
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	clientSignature := readWriteModel.NewSignatureData(NULL_STRING, NULL_BYTE_STRING)

	activateSessionRequest := readWriteModel.NewActivateSessionRequest(
		requestHeader,
		clientSignature,
		nil,
		nil,
		userIdentityToken,
		clientSignature,
	)

	identifier := activateSessionRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil)

	extObject := readWriteModel.NewRootExtensionObject(
		expandedNodeId,
		activateSessionRequest,
	)

	buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
	if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
		s.log.Debug().Err(err).Msg("error serializing")
		connection.fireConnectionError(err, ch)
		return
	}

	consumer := func(opcuaResponse []byte) {
		message, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
		if err != nil {
			s.log.Error().Err(err).Msg("error parsing")
			return
		}
		s.log.Trace().Stringer("message", message).Msg("looking at message")
		if fault, ok := message.GetBody().(readWriteModel.ServiceFault); ok {
			statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
			statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
			s.log.Error().
				Uint32("statusCode", statusCode).
				Stringer("statusCodeByValue", statusCodeByValue).
				Msg("Failed to connect to opc ua server for the following reason")
			connection.fireConnectionError(errors.New("service fault received"), ch)
			return
		}
		s.log.Debug().Msg("Got Activate Session Response Connection Response")

		extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
		if err != nil {
			s.log.Error().Err(err).Msg("error parsing")
			return
		}
		unknownExtensionObject := extensionObject.GetBody()
		if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponse); ok {
			returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle()
			if !(requestHandle == returnedRequestHandle) {
				s.log.Error().
					Uint32("requestHandle", requestHandle).
					Uint32("returnedRequestHandle", returnedRequestHandle).
					Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle")
			}

			// Send an event that connection setup is complete.
			s.keepAlive()
			connection.fireConnected(ch)
		} else {
			serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
			header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
			s.log.Error().
				Stringer("serviceResult", header.GetServiceResult()).
				Msg("Subscription ServiceFault returned from server with error code")
		}
	}

	errorDispatcher := func(err error) {
		s.log.Error().Err(err).Msg("Error while waiting for subscription response")
		connection.fireConnectionError(err, ch)
	}

	s.submit(ctx, connection.messageCodec, errorDispatcher, consumer, buffer)
}

func (s *SecureChannel) onDisconnect(ctx context.Context, connection *Connection) {
	s.log.Info().Msg("disconnecting")
	requestHandle := s.getRequestHandle()

	s.keepAliveIndicator.Store(false)

	expandedNodeId := readWriteModel.NewExpandedNodeId(false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, 473),
		nil,
		nil) //Identifier for OpenSecureChannel

	if s.authenticationToken == nil {
		// TODO: this or nil?? What do we do when we don't have one?
		s.log.Error().Msg("no authentication token, so we can't disconnect")
		return
	}
	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		requestHandle, //RequestHandle
		0,
		NULL_STRING,
		5000,
		NULL_EXTENSION_OBJECT)

	closeSessionRequest := readWriteModel.NewCloseSessionRequest(
		requestHeader,
		true)

	extObject := readWriteModel.NewRootExtensionObject(
		expandedNodeId,
		closeSessionRequest,
	)

	buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
	if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
		s.log.Debug().Err(err).Msg("error serializing")
		return
	}

	consumer := func(opcuaResponse []byte) {
		message, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
		if err != nil {
			s.log.Error().Err(err).Msg("error parsing")
			return
		}
		s.log.Trace().Stringer("message", message).Msg("looking at message")
		if fault, ok := message.GetBody().(readWriteModel.ServiceFault); ok {
			statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
			statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
			s.log.Error().
				Uint32("statusCode", statusCode).
				Stringer("statusCodeByValue", statusCodeByValue).
				Msg("Failed to connect to opc ua server for the following reason")
			return
		}
		s.log.Debug().Msg("Got Close Session Response Connection Response")

		extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
		if err != nil {
			s.log.Error().Err(err).Msg("error parsing")
			return
		}
		unknownExtensionObject := extensionObject.GetBody()
		if responseMessage, ok := unknownExtensionObject.(readWriteModel.CloseSessionResponse); ok {
			go s.onDisconnectCloseSecureChannel(ctx, connection, responseMessage, message.GetBody().(readWriteModel.CloseSessionResponse))
		} else {
			serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
			header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
			s.log.Error().
				Stringer("serviceResult", header.GetServiceResult()).
				Msg("Subscription ServiceFault returned from server with error code")
		}
	}

	errorDispatcher := func(err error) {
		s.log.Error().Err(err).Msg("Error while waiting for close session response")
	}

	s.submit(ctx, connection.messageCodec, errorDispatcher, consumer, buffer)
}

func (s *SecureChannel) onDisconnectCloseSecureChannel(ctx context.Context, connection *Connection, message readWriteModel.CloseSessionResponse, response readWriteModel.CloseSessionResponse) {
	transactionId := s.channelTransactionManager.getTransactionIdentifier()

	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		0, //RequestHandle
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	closeSecureChannelRequest := readWriteModel.NewCloseSecureChannelRequest(requestHeader)

	identifier := closeSecureChannelRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(
		false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil,
	)

	closeRequest := readWriteModel.NewOpcuaCloseRequest(
		readWriteModel.ChunkType_FINAL,
		readWriteModel.NewSecurityHeader(s.channelId.Load(), s.tokenId.Load()),
		readWriteModel.NewExtensiblePayload(
			readWriteModel.NewSequenceHeader(transactionId, transactionId),
			readWriteModel.NewRootExtensionObject(
				expandedNodeId,
				closeSecureChannelRequest,
			),
		),
	)

	apu := readWriteModel.NewOpcuaAPU(closeRequest)

	requestConsumer := func(transactionId int32) {
		if err := connection.messageCodec.SendRequest(
			ctx,
			apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				openResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				opcuaMessageResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
				s.log.Trace().Stringer("opcuaMessageResponse", opcuaMessageResponse).Msg("Got close secure channel response")
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				return nil
			},
			REQUEST_TIMEOUT,
		); err != nil {
			s.log.Debug().Err(err).Msg("a error")
		}
	}
	s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting CloseSecureChannel with id")
	if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) onDiscover(ctx context.Context, codec *MessageCodec) {
	s.log.Trace().Msg("onDiscover")
	// Only the TCP transport supports login.
	s.log.Debug().Msg("Opcua Driver running in ACTIVE mode, discovering endpoints")

	hello := readWriteModel.NewOpcuaHelloRequest(
		readWriteModel.ChunkType_FINAL,
		VERSION,
		readWriteModel.NewOpcuaProtocolLimits(
			DEFAULT_RECEIVE_BUFFER_SIZE,
			DEFAULT_SEND_BUFFER_SIZE,
			DEFAULT_MAX_MESSAGE_SIZE,
			DEFAULT_MAX_CHUNK_COUNT,
		),
		s.endpoint,
	)

	apu := readWriteModel.NewOpcuaAPU(hello)

	requestConsumer := func(transactionId int32) {
		if err := codec.SendRequest(
			ctx,
			apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				_, ok = messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return true
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
				s.log.Trace().Stringer("opcuaAcknowledgeResponse", opcuaAcknowledgeResponse).Msg("Got Hello Response Connection Response")
				go s.onDiscoverOpenSecureChannel(ctx, codec, opcuaAcknowledgeResponse)
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				return nil
			},
			REQUEST_TIMEOUT,
		); err != nil {
			s.log.Debug().Err(err).Msg("a error")
		}
	}
	if err := s.channelTransactionManager.submit(requestConsumer, s.channelTransactionManager.getTransactionIdentifier()); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) onDiscoverOpenSecureChannel(ctx context.Context, codec *MessageCodec, opcuaAcknowledgeResponse readWriteModel.OpcuaAcknowledgeResponse) {
	s.log.Trace().Msg("onDiscoverOpenSecureChannel")
	transactionId := s.channelTransactionManager.getTransactionIdentifier()

	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		0, //RequestHandle
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	openSecureChannelRequest := readWriteModel.NewOpenSecureChannelRequest(
		requestHeader,
		VERSION,
		readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
		readWriteModel.MessageSecurityMode_messageSecurityModeNone,
		NULL_BYTE_STRING,
		s.lifetime,
	)

	identifier := openSecureChannelRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(
		false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil,
	)

	extObject := readWriteModel.NewRootExtensionObject(
		expandedNodeId,
		openSecureChannelRequest,
	)

	buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
	if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
		s.log.Debug().Err(err).Msg("error serializing")
		return
	}

	openRequest := readWriteModel.NewOpcuaOpenRequest(
		readWriteModel.ChunkType_FINAL,
		readWriteModel.NewOpenChannelMessageRequest(
			0,
			SECURITY_POLICY_NONE,
			NULL_BYTE_STRING,
			NULL_BYTE_STRING,
		),
		readWriteModel.NewBinaryPayload(
			readWriteModel.NewSequenceHeader(transactionId, transactionId),
			buffer.GetBytes(),
		),
	)

	apu := readWriteModel.NewOpcuaAPU(openRequest)

	requestConsumer := func(transactionId int32) {
		if err := codec.SendRequest(
			ctx,
			apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				openResponse, ok := messagePDU.(readWriteModel.OpcuaOpenResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				opcuaOpenResponse := messagePDU.(readWriteModel.OpcuaOpenResponse)
				readBuffer := utils.NewReadBufferByteBased(opcuaOpenResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
				extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
				if err != nil {
					return errors.Wrap(err, "error parsing")
				}

				if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
					statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
					statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
					s.log.Error().
						Uint32("statusCode", statusCode).
						Stringer("statusCodeByValue", statusCodeByValue).
						Msg("Failed to connect to opc ua server for the following reason")
					return nil
				}
				s.log.Debug().Msg("Got Secure Response Connection Response")
				openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
				go s.onDiscoverGetEndpointsRequest(ctx, codec, opcuaOpenResponse, openSecureChannelResponse)
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				return nil
			},
			REQUEST_TIMEOUT,
		); err != nil {
			s.log.Debug().Err(err).Msg("a error")
		}
	}
	if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec *MessageCodec, opcuaOpenResponse readWriteModel.OpcuaOpenResponse, openSecureChannelResponse readWriteModel.OpenSecureChannelResponse) {
	s.log.Trace().Msg("onDiscoverGetEndpointsRequest")
	s.tokenId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())
	s.channelId.Store(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId())

	transactionId := s.channelTransactionManager.getTransactionIdentifier()

	nextSequenceNumber := opcuaOpenResponse.GetMessage().GetSequenceHeader().GetSequenceNumber() + 1
	nextRequestId := opcuaOpenResponse.GetMessage().GetSequenceHeader().GetRequestId() + 1

	if !(transactionId == nextSequenceNumber) {
		s.log.Error().
			Int32("transactionId", transactionId).
			Int32("nextSequenceNumber", nextSequenceNumber).
			Msg("Sequence number isn't as expected, we might have missed a packet. - transactionId != nextSequenceNumber")
		return
	}

	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		0, //RequestHandle
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	endpointsRequest := readWriteModel.NewGetEndpointsRequest(
		requestHeader,
		s.endpoint,
		nil,
		nil)

	identifier := endpointsRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(
		false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil,
	)

	extObject := readWriteModel.NewRootExtensionObject(
		expandedNodeId,
		endpointsRequest,
	)

	buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
	if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
		s.log.Debug().Err(err).Msg("error serializing")
		return
	}

	messageRequest := readWriteModel.NewOpcuaMessageRequest(
		readWriteModel.ChunkType_FINAL,
		readWriteModel.NewSecurityHeader(
			s.channelId.Load(),
			s.tokenId.Load(),
		),
		readWriteModel.NewBinaryPayload(
			readWriteModel.NewSequenceHeader(nextSequenceNumber, nextRequestId),
			buffer.GetBytes(),
		),
	)

	apu := readWriteModel.NewOpcuaAPU(messageRequest)

	requestConsumer := func(transactionId int32) {
		if err := codec.SendRequest(
			ctx,
			apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				messageResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return messageResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				messageResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
				readBuffer := utils.NewReadBufferByteBased(messageResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
				extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
				if err != nil {
					return errors.Wrap(err, "error parsing")
				}

				if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
					statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
					statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
					s.log.Error().
						Uint32("statusCode", statusCode).
						Stringer("statusCodeByValue", statusCodeByValue).
						Msg("Failed to connect to opc ua server for the following reason")
				} else {
					s.log.Debug().Msg("Got Secure Response Connection Response")
					response := extensionObject.GetBody().(readWriteModel.GetEndpointsResponse)

					endpoints := response.GetEndpoints()
					for _, endpoint := range endpoints {
						endpointDescription := endpoint.(readWriteModel.EndpointDescription)
						if endpointDescription.GetEndpointUrl().GetStringValue() == (s.endpoint.GetStringValue()) && *endpointDescription.GetSecurityPolicyUri().GetStringValue() == (s.securityPolicy) {
							s.log.Info().Str("stringValue", *s.endpoint.GetStringValue()).Msg("Found OPC UA endpoint")
							s.configuration.SenderCertificate = endpointDescription.GetServerCertificate().GetStringValue()
						}
					}

					digest := sha1.Sum(s.configuration.SenderCertificate)
					s.thumbprint = readWriteModel.NewPascalByteString(int32(len(digest)), digest[:])

					go s.onDiscoverCloseSecureChannel(ctx, codec, response)
				}
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				return nil
			},
			REQUEST_TIMEOUT,
		); err != nil {
			s.log.Debug().Err(err).Msg("a error")
		}
	}
	if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) onDiscoverCloseSecureChannel(ctx context.Context, codec *MessageCodec, response readWriteModel.GetEndpointsResponse) {
	s.log.Trace().Msg("onDiscoverCloseSecureChannel")
	transactionId := s.channelTransactionManager.getTransactionIdentifier()

	requestHeader := readWriteModel.NewRequestHeader(
		s.getAuthenticationToken(),
		s.getCurrentDateTime(),
		0, //RequestHandle
		0,
		NULL_STRING,
		REQUEST_TIMEOUT_LONG,
		NULL_EXTENSION_OBJECT)

	closeSecureChannelRequest := readWriteModel.NewCloseSecureChannelRequest(requestHeader)

	identifier := closeSecureChannelRequest.GetExtensionId()
	expandedNodeId := readWriteModel.NewExpandedNodeId(
		false, //Namespace Uri Specified
		false, //Server Index Specified
		readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
		nil,
		nil,
	)

	closeRequest := readWriteModel.NewOpcuaCloseRequest(
		readWriteModel.ChunkType_FINAL,
		readWriteModel.NewSecurityHeader(
			s.channelId.Load(),
			s.tokenId.Load(),
		),
		readWriteModel.NewExtensiblePayload(
			readWriteModel.NewSequenceHeader(transactionId, transactionId),
			readWriteModel.NewRootExtensionObject(
				expandedNodeId,
				closeSecureChannelRequest,
			),
		),
	)

	apu := readWriteModel.NewOpcuaAPU(closeRequest)

	requestConsumer := func(transactionId int32) {
		if err := codec.SendRequest(
			ctx,
			apu,
			func(message spi.Message) bool {
				opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
				if !ok {
					s.log.Debug().Type("type", message).Msg("Not relevant")
					return false
				}
				messagePDU := opcuaAPU.GetMessage()
				openResponse, ok := messagePDU.(readWriteModel.OpcuaMessageResponse)
				if !ok {
					s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
					return false
				}
				return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
			},
			func(message spi.Message) error {
				opcuaAPU := message.(readWriteModel.OpcuaAPU)
				messagePDU := opcuaAPU.GetMessage()
				opcuaMessageResponse := messagePDU.(readWriteModel.OpcuaMessageResponse)
				s.log.Trace().Stringer("opcuaMessageResponse", opcuaMessageResponse).Msg("Got close secure channel response")
				return nil
			},
			func(err error) error {
				s.log.Debug().Err(err).Msg("error submitting")
				return nil
			},
			REQUEST_TIMEOUT,
		); err != nil {
			s.log.Debug().Err(err).Msg("a error")
		}
	}
	s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting CloseSecureChannel with id")
	if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
		s.log.Debug().Err(err).Msg("error submitting")
	}
}

func (s *SecureChannel) keepAlive() {
	s.keepAliveStateChange.Lock()
	defer s.keepAliveStateChange.Unlock()
	if s.keepAliveIndicator.Load() {
		s.log.Warn().Msg("keepalive already running")
		return
	}
	s.keepAliveWg.Add(1)
	s.wg.Add(1)
	go func() {
		defer s.wg.Done()
		defer s.keepAliveWg.Done()
		s.keepAliveIndicator.Store(true)
		defer s.keepAliveIndicator.Store(false)
		defer s.log.Info().Msg("ending keepalive")
		ctx := context.Background()
		for (s.codec == nil || s.codec.IsRunning()) && s.keepAliveIndicator.Load() {
			sleepTime := time.Duration(math.Ceil(float64(s.lifetime)*0.75)) * time.Millisecond
			s.log.Trace().Dur("sleepTime", sleepTime).Msg("Sleeping")
			time.Sleep(sleepTime)

			transactionId := s.channelTransactionManager.getTransactionIdentifier()

			requestHeader := readWriteModel.NewRequestHeader(
				s.getAuthenticationToken(),
				s.getCurrentDateTime(),
				0, //RequestHandle
				0,
				NULL_STRING,
				REQUEST_TIMEOUT_LONG,
				NULL_EXTENSION_OBJECT,
			)

			var openSecureChannelRequest readWriteModel.OpenSecureChannelRequest
			if s.isEncrypted {
				openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
					requestHeader,
					VERSION,
					readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
					readWriteModel.MessageSecurityMode_messageSecurityModeSignAndEncrypt,
					readWriteModel.NewPascalByteString(int32(len(s.clientNonce)), s.clientNonce),
					uint32(s.lifetime))
			} else {
				openSecureChannelRequest = readWriteModel.NewOpenSecureChannelRequest(
					requestHeader,
					VERSION,
					readWriteModel.SecurityTokenRequestType_securityTokenRequestTypeIssue,
					readWriteModel.MessageSecurityMode_messageSecurityModeNone,
					NULL_BYTE_STRING,
					uint32(s.lifetime))
			}
			identifier := openSecureChannelRequest.GetExtensionId()
			expandedNodeId := readWriteModel.NewExpandedNodeId(false, //Namespace Uri Specified
				false, //Server Index Specified
				readWriteModel.NewNodeIdFourByte(0, uint16(identifier)),
				nil,
				nil)

			extObject := readWriteModel.NewRootExtensionObject(
				expandedNodeId,
				openSecureChannelRequest,
			)

			buffer := utils.NewWriteBufferByteBased(utils.WithByteOrderForByteBasedBuffer(binary.LittleEndian))
			if err := extObject.SerializeWithWriteBuffer(ctx, buffer); err != nil {
				s.log.Error().Err(err).Msg("error serializing")
				return
			}

			openRequest := readWriteModel.NewOpcuaOpenRequest(
				readWriteModel.ChunkType_FINAL,
				readWriteModel.NewOpenChannelMessageRequest(0,
					readWriteModel.NewPascalString(&s.securityPolicy),
					s.publicCertificate,
					s.thumbprint,
				),
				readWriteModel.NewBinaryPayload(
					readWriteModel.NewSequenceHeader(transactionId, transactionId),
					buffer.GetBytes(),
				),
			)

			var apu readWriteModel.OpcuaAPU

			if s.isEncrypted {
				message, err := s.encryptionHandler.encodeMessage(ctx, openRequest, buffer.GetBytes())
				if err != nil {
					s.log.Error().Err(err).Msg("error encoding")
					return
				}
				apu, err = readWriteModel.OpcuaAPUParse(ctx, message, false, true)
				if err != nil {
					s.log.Error().Err(err).Msg("error parsing")
					return
				}
			} else {
				apu = readWriteModel.NewOpcuaAPU(openRequest)
			}

			requestConsumer := func(transactionId int32) {
				if err := s.codec.SendRequest(
					ctx,
					apu,
					func(message spi.Message) bool {
						opcuaAPU, ok := message.(readWriteModel.OpcuaAPU)
						if !ok {
							s.log.Debug().Type("type", message).Msg("Not relevant")
							return false
						}
						messagePDU := opcuaAPU.GetMessage()
						openResponse, ok := messagePDU.(readWriteModel.OpcuaOpenResponse)
						if !ok {
							s.log.Debug().Type("type", messagePDU).Msg("Not relevant")
							return false
						}
						return openResponse.GetMessage().GetSequenceHeader().GetRequestId() == transactionId
					},
					func(message spi.Message) error {
						opcuaAPU := message.(readWriteModel.OpcuaAPU)
						messagePDU := opcuaAPU.GetMessage()
						opcuaOpenResponse := messagePDU.(readWriteModel.OpcuaOpenResponse)
						readBuffer := utils.NewReadBufferByteBased(opcuaOpenResponse.(readWriteModel.BinaryPayload).GetPayload(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
						extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer[readWriteModel.ExtensionObject](ctx, readBuffer, false)
						if err != nil {
							return errors.Wrap(err, "error parsing")
						}

						if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFault); ok {
							statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
							statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
							s.log.Error().
								Uint32("statusCode", statusCode).
								Stringer("statusCodeByValue", statusCodeByValue).
								Msg("Failed to connect to opc ua server for the following reason")
						} else {
							s.log.Debug().Msg("Got Secure Response Connection Response")
							openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
							token := openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken)
							s.tokenId.Store(token.GetTokenId())
							s.channelId.Store(token.GetChannelId())
							s.lifetime = token.GetRevisedLifetime()
						}
						return nil
					},
					func(err error) error {
						s.log.Debug().Err(err).Msg("error submitting")
						return nil
					},
					REQUEST_TIMEOUT,
				); err != nil {
					s.log.Debug().Err(err).Msg("a error")
				}
			}
			s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting OpenSecureChannel with id")
			if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
				s.log.Debug().Err(err).Msg("error submitting")
			}
		}
	}()
	return
}

// getRequestHandle returns the next request handle
func (s *SecureChannel) getRequestHandle() uint32 {
	return s.requestHandleGenerator.Add(1) - 1
}

// getAuthenticationToken returns the authentication token for the current connection
func (s *SecureChannel) getAuthenticationToken() readWriteModel.NodeId {
	if s.authenticationToken == nil {
		panic("authenticationToken should be set at this point")
	}
	return readWriteModel.NewNodeId(s.authenticationToken)
}

// getChannelId gets the Channel identifier for the current channel
func (s *SecureChannel) getChannelId() uint32 {
	return s.channelId.Load()
}

// getTokenId gets the Token Identifier
func (s *SecureChannel) getTokenId() uint32 {
	return s.tokenId.Load()
}

// selectEndpoint Selects the endpoint to use based on the connection string provided.
//   - If Discovery is disabled it will use the host address return from the server
//   - @param sessionResponse - The CreateSessionResponse message returned by the server
//   - @throws PlcRuntimeException - If no endpoint with a compatible policy is found raise and error.
func (s *SecureChannel) selectEndpoint(sessionResponse readWriteModel.CreateSessionResponse) {
	// Get a list of the endpoints which match ours.
	var filteredEndpoints []readWriteModel.EndpointDescription
	for _, endpoint := range sessionResponse.GetServerEndpoints() {
		endpointDescription := endpoint.(readWriteModel.EndpointDescription)
		if s.isEndpoint(endpointDescription) {
			filteredEndpoints = append(filteredEndpoints, endpointDescription)
		}
	}

	//Determine if the requested security policy is included in the endpoint
	for _, endpoint := range filteredEndpoints {
		userIdentityTokens := make([]readWriteModel.UserTokenPolicy, len(endpoint.GetUserIdentityTokens()))
		for i, definition := range endpoint.GetUserIdentityTokens() {
			userIdentityTokens[i] = definition.(readWriteModel.UserTokenPolicy)
		}
		s.hasIdentity(userIdentityTokens)
	}

	if s.policyId == nil {
		s.log.Error().Str("endpoint", s.endpoints[0]).Msg("Unable to find endpoint")
		return
	}

	if s.tokenType == 0xffffffff { // TODO: what did we use as undefined
		s.log.Error().Str("endpoint", s.endpoints[0]).Msg("Unable to find Security Policy for endpoint")
		return
	}
}

// isEndpoint checks each component of the return endpoint description against the connection string.
//   - If all are correct then return true.
//   - @param endpoint - EndpointDescription returned from server
//   - @return true if this endpoint matches our configuration
//   - @return error - If the returned endpoint string doesn't match the format expected
func (s *SecureChannel) isEndpoint(endpoint readWriteModel.EndpointDescription) bool {
	// Split up the connection string into its individual segments.
	matches := utils.GetSubgroupMatches(URI_PATTERN, *endpoint.GetEndpointUrl().GetStringValue())
	if len(matches) == 0 {
		s.log.Error().Stringer("endpoint", endpoint).Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'")
		return false
	}
	s.log.Trace().
		Str("transportHost", matches["transportHost"]).
		Str("transportPort", matches["transportPort"]).
		Str("transportEndpoint", matches["transportEndpoint"]).
		Msg("Using Endpoint")

	if s.configuration.Discovery && !slices.Contains(s.endpoints, matches["transportHost"]) {
		return false
	}

	if s.configuration.Port != matches["transportPort"] {
		return false
	}

	if s.configuration.TransportEndpoint != matches["transportEndpoint"] {
		return false
	}

	if !s.configuration.Discovery {
		s.configuration.Host = matches["transportHost"]
	}

	return true
}

// hasIdentity confirms that a policy that matches the connection string is available from
//   - the returned endpoints. It sets the policyId and tokenType for the policy to use.
//   - @param policies - A list of policies returned with the endpoint description.
func (s *SecureChannel) hasIdentity(policies []readWriteModel.UserTokenPolicy) {
	for _, identityToken := range policies {
		if (identityToken.GetTokenType() == readWriteModel.UserTokenType_userTokenTypeAnonymous) && (s.username == "") {
			s.policyId = identityToken.GetPolicyId()
			s.tokenType = identityToken.GetTokenType()
		} else if (identityToken.GetTokenType() == readWriteModel.UserTokenType_userTokenTypeUserName) && (s.username != "") {
			s.policyId = identityToken.GetPolicyId()
			s.tokenType = identityToken.GetTokenType()
		}
	}
}

// getIdentityToken creates an IdentityToken to authenticate with a server.
//   - @param tokenType      the token type
//   - @param policyId 	 	 the policy id
//   - @return returns an ExtensionObject with an IdentityToken.
func (s *SecureChannel) getIdentityToken(tokenType readWriteModel.UserTokenType, policyId *string) readWriteModel.ExtensionObject {
	switch tokenType {
	case readWriteModel.UserTokenType_userTokenTypeAnonymous:
		//If we aren't using authentication tell the server we would like to log in anonymously
		anonymousIdentityToken := readWriteModel.NewAnonymousIdentityToken(readWriteModel.NewPascalString(policyId))
		extExpandedNodeId := readWriteModel.NewExpandedNodeId(
			false, //Namespace Uri Specified
			false, //Server Index Specified
			readWriteModel.NewNodeIdFourByte(0, uint16(anonymousIdentityToken.GetExtensionId())),
			nil,
			nil,
		)
		return readWriteModel.NewBinaryExtensionObjectWithMask(
			extExpandedNodeId,
			BINARY_ENCODING_MASK,
			anonymousIdentityToken,
		)
	case readWriteModel.UserTokenType_userTokenTypeUserName:
		//Encrypt the password using the server nonce and server public key
		passwordBytes := []byte(s.password)
		encodeableBuffer := new(bytes.Buffer)
		var err error
		err = binary.Write(encodeableBuffer, binary.LittleEndian, len(passwordBytes)+len(s.senderNonce))
		s.log.Debug().Err(err).Msg("write")
		err = binary.Write(encodeableBuffer, binary.LittleEndian, passwordBytes)
		s.log.Debug().Err(err).Msg("write")
		err = binary.Write(encodeableBuffer, binary.LittleEndian, s.senderNonce)
		s.log.Debug().Err(err).Msg("write")
		encodeablePassword := make([]byte, 4+len(passwordBytes)+len(s.senderNonce))
		n, err := encodeableBuffer.Read(encodeablePassword)
		s.log.Debug().Err(err).Int("n", n).Msg("read")
		encryptedPassword, err := s.encryptionHandler.encryptPassword(encodeablePassword)
		if err != nil {
			s.log.Error().Err(err).Msg("error encrypting password")
			return nil
		}
		userNameIdentityToken := readWriteModel.NewUserNameIdentityToken(
			readWriteModel.NewPascalString(policyId),
			readWriteModel.NewPascalString(&s.username),
			readWriteModel.NewPascalByteString(int32(len(encryptedPassword)), encryptedPassword),
			readWriteModel.NewPascalString(utils.ToPtr(PASSWORD_ENCRYPTION_ALGORITHM)),
		)
		extExpandedNodeId := readWriteModel.NewExpandedNodeId(
			false, //Namespace Uri Specified
			false, //Server Index Specified
			readWriteModel.NewNodeIdFourByte(0, uint16(userNameIdentityToken.GetExtensionId())),
			nil,
			nil)
		return readWriteModel.NewBinaryExtensionObjectWithMask(
			extExpandedNodeId,
			BINARY_ENCODING_MASK,
			userNameIdentityToken,
		)
	}
	return nil
}

func (s *SecureChannel) getCurrentDateTime() int64 {
	return (time.Now().UnixMilli() * 10000) + EPOCH_OFFSET
}
