blob: 78f768add2f7ede7159133efab69d848ba10c8f8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package client
import (
import (
import (
_ ""
// defaultBufferSize is the tcp read default buffer size
const defaultBufferSize = 1024 * 1024 * 4
// TelnetClient maintain a connection to target
type TelnetClient struct {
tcpAddr string
responseTimeout time.Duration
protocolName string
requestList []*protocol.Request
conn *net.TCPConn
proto protocol.Protocol
sequence atomic.Uint64
pendingResponses *sync.Map
waitNum atomic.Uint64
// NewTelnetClient create a new tcp connection, and create default request
func NewTelnetClient(host string, port int, protocolName, interfaceID, version, group, method string, reqPkg interface{}, timeout int) (*TelnetClient, error) {
tcpAddr := net.JoinHostPort(host, strconv.Itoa(port))
resolved := resolveTCPAddr(tcpAddr)
conn, err := net.DialTCP("tcp", nil, resolved)
if err != nil {
return nil, err
log.Printf("connected to %s:%d\n", host, port)
log.Printf("try calling interface:%s.%s\n", interfaceID, method)
log.Printf("with protocol:%s\n\n", protocolName)
proto := common.GetProtocol(protocolName)
return &TelnetClient{
tcpAddr: tcpAddr,
conn: conn,
responseTimeout: time.Duration(timeout) * time.Millisecond, // default timeout
protocolName: protocolName,
pendingResponses: &sync.Map{},
proto: proto,
requestList: []*protocol.Request{
InterfaceID: interfaceID,
Version: version,
Method: method,
Group: group,
Params: []interface{}{reqPkg},
}, nil
func resolveTCPAddr(addr string) *net.TCPAddr {
resolved, error := net.ResolveTCPAddr("tcp", addr)
if nil != error {
log.Fatalf("Error occured while resolving TCP address \"%v\": %v\n", addr, error)
return resolved
// ProcessRequests send all requests
func (t *TelnetClient) ProcessRequests(userPkg interface{}) {
for i := range t.requestList {
t.processSingleRequest(t.requestList[i], userPkg)
// addPendingResponse add a response @model to pending queue
// once the rsp got, the model will be used.
func (t *TelnetClient) addPendingResponse(model interface{}) uint64 {
seqId := t.sequence.Load()
t.pendingResponses.Store(seqId, model)
return seqId
// removePendingResponse delete item from pending queue by @seq
func (t *TelnetClient) removePendingResponse(seq uint64) {
if t.pendingResponses == nil {
if _, ok := t.pendingResponses.Load(seq); ok {
// processSingleRequest call one req.
func (t *TelnetClient) processSingleRequest(req *protocol.Request, userPkg interface{}) {
// proto create package procedure
req.ID = t.sequence.Load()
inputData, err := t.proto.Write(req)
if err != nil {
log.Fatalln("error: handler.Writer err = ", err)
startTime := time.Now()
// init rsp Package and add to pending queue
seqId := t.addPendingResponse(userPkg)
defer t.removePendingResponse(seqId)
requestDataChannel := make(chan []byte)
responseDataChannel := make(chan []byte)
// start data transfer procedure
go t.readInputData(string(inputData), requestDataChannel)
go t.readServerData(t.conn, responseDataChannel)
timeAfter := time.After(t.responseTimeout)
for {
select {
case <-timeAfter:
log.Println("request timeout to:", t.tcpAddr)
case request := <-requestDataChannel:
if _, err := t.conn.Write(request); nil != err {
log.Fatalf("Error occured while writing to TCP socket: %v\n", err)
case response := <-responseDataChannel:
rspPkg, _, err := t.proto.Read(response, t.pendingResponses)
if err != nil {
log.Fatalln("Error with protocol Read(): ", err)
log.Printf("After %dms , Got Rsp:", time.Since(startTime).Milliseconds())
if t.waitNum.Sub(0) == 0 {
func (t *TelnetClient) readInputData(inputData string, toSent chan<- []byte) {
toSent <- []byte(inputData)
func (t *TelnetClient) readServerData(connection *net.TCPConn, received chan<- []byte) {
buffer := make([]byte, defaultBufferSize)
var err error
var n int
for nil == err {
n, err = connection.Read(buffer)
received <- buffer[:n]
func (t *TelnetClient) assertEOF(error error) {
if "EOF" != error.Error() {
log.Fatalf("Error occured while operating on TCP socket: %v\n", error)
// Destroy close the tcp conn
func (t *TelnetClient) Destroy() {