blob: a6d1b901db66dca631fa75e2fde670fd092503b5 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package etch
import "net"
import "os"
import "fmt"
type Connection interface {
openSocket() bool
readSocket()
closeSocket()
TransportData
}
//we do not distinguish between TcpConnection and TcpTransport like Java does...
type TcpConnection struct {
address string
session SessionData
status int
isServer bool
connection *net.TCPConn
buf *FlexBuffer
}
func NewTcpConnection(address string, isServer bool, session SessionData, socket *net.TCPConn) *TcpConnection {
return &TcpConnection{address, session, DOWN, isServer, socket, NewFlexBuffer()}
}
func (tcpc *TcpConnection) openSocket() bool {
var er os.Error = nil
if tcpc.isServer {
if tcpc.connection != nil {
Log("SERVER CONNECTION, GOING UP DIRECTLY\n")
tcpc.status = UP
return true
}
} else {
addr, _ := net.ResolveTCPAddr("tcp", tcpc.address)
tcpc.connection, er = net.DialTCP("tcp", nil, addr)
tcpc.connection.SetLinger(0)
if er != nil {
Log("TcpConnection: error with client socket " + fmt.Sprint(er) + "\n")
return false
}
Log("TcpConnection: open socket client success, before notify, connection is " + fmt.Sprint(tcpc.connection) + " \n")
tcpc.status = UP
tcpc.GetSession().SessionNotify(UP)
Log("TcpConnection: open socket client success\n")
}
return true
}
func (tcpc *TcpConnection) readSocket() {
buffer := make([]byte, 8192)
for tcpc.status != DOWN {
Log("TcpConnection in readSocket\n")
n, er := tcpc.connection.Read(buffer)
Log("TcpConnection in after readSocket: " + fmt.Sprint(n) + "\n")
if er != nil {
Log("TcpConnection readSocket error: " + fmt.Sprint(er) + "\n")
tcpc.status = DOWN
}
tcpc.buf.PutBytes(buffer[0:n])
tcpc.buf.Index = 0
Log("TcpConnection: got " + fmt.Sprint(n) + " bytes data " + fmt.Sprint(tcpc.buf.avail()) + " " + fmt.Sprint(tcpc.buf) + "\n")
tcpc.GetSession().(SessionData).SessionData(nil, tcpc.buf)
tcpc.buf.Index = 0
tcpc.buf.Length = 0
}
}
func (tcpc *TcpConnection) closeSocket() {
Log("TcpConnection CLOSING SOCKET\n")
tcpc.connection.Close()
tcpc.GetSession().SessionNotify(DOWN)
Log("TcpConnection CLOSING SOCKET DOWN, session notify sent\n")
}
func (tcpc *TcpConnection) GetSession() Session {
return tcpc.session
}
func (tcpc *TcpConnection) SetSession(sess Session) {
tcpc.session = sess.(SessionData)
}
func (tcpc *TcpConnection) TransportData(recipient interface{}, buf *FlexBuffer) {
Log("TcpConnection sending data\n")
avail := buf.avail()
bytes := make([]byte, buf.avail())
buf.ReadBytes(&bytes)
Log("TcpConnection: sending: #" + fmt.Sprint(len(bytes)) + " " + fmt.Sprint(bytes) + "\n")
n, er := tcpc.connection.Write(bytes)
if er != nil || n != int(avail) {
Log("TcpConnection " + fmt.Sprint(tcpc.isServer) + " : error while sending with " + fmt.Sprint(tcpc.connection) +
" Error was: " + fmt.Sprint(er) + ", sent " + fmt.Sprint(n) + "/" + fmt.Sprint(avail) + " bytes\n")
}
}
func (tcpc *TcpConnection) TransportQuery(obj interface{}) interface{} {
//nothing to do (java binding allow query of addresses, TODO)
return nil
}
func (tcpc *TcpConnection) TransportControl(ctrl interface{}, value interface{}) {
Log("TcpConnection: transportControl: " + fmt.Sprint(ctrl) + "\n")
if ctrl == START {
Log("TcpConnection: transportControl: " + fmt.Sprint(ctrl) + "\n")
go tcpc.start()
} else if ctrl == STOP {
tcpc.stop()
}
}
func (tcpc *TcpConnection) TransportNotify(value interface{}) {
//nothing to do
}
func (tcpc *TcpConnection) start() {
ok := tcpc.openSocket()
if ok == false {
return
}
tcpc.readSocket()
}
func (tcpc *TcpConnection) stop() {
Log("TcpConnection: stopping\n")
tcpc.closeSocket()
}