blob: b34e66fd940c37b6a5350d2f7cf432c0b1215d99 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <iostream>
#include <QpidError.h>
#include <sys/Time.h>
#include "Connector.h"
using namespace qpid::sys;
using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
version(pVersion),
closed(true),
lastIn(0), lastOut(0),
timeout(0),
idleIn(0), idleOut(0),
timeoutHandler(0),
shutdownHandler(0),
inbuf(receive_buffer_size),
outbuf(send_buffer_size){ }
Connector::~Connector(){ }
void Connector::connect(const std::string& host, int port){
socket = Socket::createTcp();
socket.connect(host, port);
closed = false;
receiver = Thread(this);
}
void Connector::init(ProtocolInitiation* header){
writeBlock(header);
delete header;
}
void Connector::close(){
closed = true;
socket.close();
receiver.join();
}
void Connector::setInputHandler(InputHandler* handler){
input = handler;
}
void Connector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
OutputHandler* Connector::getOutputHandler(){
return this;
}
void Connector::send(AMQFrame* frame){
writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
void Connector::writeBlock(AMQDataBlock* data){
Mutex::ScopedLock l(writeLock);
data->encode(outbuf);
//transfer data to wire
outbuf.flip();
writeToSocket(outbuf.start(), outbuf.available());
outbuf.clear();
}
void Connector::writeToSocket(char* data, size_t available){
size_t written = 0;
while(written < available && !closed){
ssize_t sent = socket.send(data + written, available-written);
if(sent > 0) {
lastOut = now() * TIME_MSEC;
written += sent;
}
}
}
void Connector::handleClosed(){
closed = true;
socket.close();
if(shutdownHandler) shutdownHandler->shutdown();
}
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
Time t = now() * TIME_MSEC;
if(status == Socket::SOCKET_TIMEOUT) {
if(idleIn && (t - lastIn > idleIn)){
timeoutHandler->idleIn();
}
}else if(status == Socket::SOCKET_EOF){
handleClosed();
}else{
lastIn = t;
}
if(idleOut && (t - lastOut > idleOut)){
timeoutHandler->idleOut();
}
}
}
void Connector::setReadTimeout(u_int16_t t){
idleIn = t * 1000;//t is in secs
if(idleIn && (!timeout || idleIn < timeout)){
timeout = idleIn;
setSocketTimeout();
}
}
void Connector::setWriteTimeout(u_int16_t t){
idleOut = t * 1000;//t is in secs
if(idleOut && (!timeout || idleOut < timeout)){
timeout = idleOut;
setSocketTimeout();
}
}
void Connector::setSocketTimeout(){
socket.setTimeout(timeout*TIME_MSEC);
}
void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
void Connector::run(){
try{
while(!closed){
ssize_t available = inbuf.available();
if(available < 1){
THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
}
ssize_t received = socket.recv(inbuf.start(), available);
checkIdle(received);
if(!closed && received > 0){
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
AMQFrame frame(version);
while(frame.decode(inbuf)){
if(debug) std::cout << "RECV: " << frame << std::endl;
input->received(&frame);
}
//need to compact buffer to preserve any 'extra' data
inbuf.compact();
}
}
}catch(QpidError error){
std::cout << "Error [" << error.code << "] " << error.msg
<< " (" << error.location.file << ":" << error.location.line
<< ")" << std::endl;
handleClosed();
}
}