blob: afd6342b2dd89b5d4f5708295dd75f0dfafa0f7c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
import (
"bufio"
"bytes"
"io"
"math/rand"
"net"
"strconv"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/tcp"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/tcp/codec"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/log"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/tcp/common"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/tcp/conf"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/tcp/utils"
)
type BaseTCPClient struct {
clientNo int
host string
port int
useAgent tcp.UserAgent
conn net.Conn
}
func NewBaseTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *BaseTCPClient {
return &BaseTCPClient{
clientNo: rand.Intn(10000),
host: eventMeshTcpClientConfig.Host(),
port: eventMeshTcpClientConfig.Port(),
useAgent: eventMeshTcpClientConfig.UserAgent(),
}
}
func (c *BaseTCPClient) Open() {
eventMeshIpAndPort := c.host + ":" + strconv.Itoa(c.port)
conn, err := net.Dial("tcp", eventMeshIpAndPort)
if err != nil {
log.Errorf("Failed to dial")
}
c.conn = conn
go c.read()
}
func (c *BaseTCPClient) Close() {
if c.conn != nil {
err := c.conn.Close()
if err != nil {
log.Errorf("Failed to close connection")
}
c.Goodbye()
}
}
func (c *BaseTCPClient) Heartbeat() {
msg := utils.BuildHeartBeatPackage()
c.IO(msg, 1000)
}
func (c *BaseTCPClient) Hello() {
msg := utils.BuildHelloPackage(c.useAgent)
c.IO(msg, 1000)
}
func (c *BaseTCPClient) Reconnect() {
}
func (c *BaseTCPClient) Goodbye() {
}
func (c *BaseTCPClient) IsActive() {
}
func (c *BaseTCPClient) read() error {
for {
var buf bytes.Buffer
for {
reader := bufio.NewReader(c.conn)
msg, isPrefix, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
break
}
return err
}
buf.Write(msg)
if !isPrefix {
break
}
}
go c.handleRead(&buf)
}
}
func (c *BaseTCPClient) handleRead(in *bytes.Buffer) {
decoded := codec.DecodePackage(in)
log.Panicf("Read from server: %v", decoded)
// TODO Handle according to the command
}
func (c *BaseTCPClient) write(message []byte) (int, error) {
writer := bufio.NewWriter(c.conn)
n, err := writer.Write(message)
if err == nil {
err = writer.Flush()
}
return n, err
}
func (c *BaseTCPClient) Send(message tcp.Package) {
out := codec.EncodePackage(message)
_, err := c.write(out.Bytes())
if err != nil {
log.Fatalf("Failed to write to peer")
}
}
func (c *BaseTCPClient) IO(message tcp.Package, timeout int64) tcp.Package {
key := common.GetRequestContextKey(message)
ctx := common.NewRequestContext(key, message, 1)
c.Send(message)
return ctx.Response()
}