/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include <Connection.h>
#include <ClientChannel.h>
#include <ClientMessage.h>
#include <QpidError.h>
#include <iostream>
#include <MethodBodyInstances.h>

using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::sys;

u_int16_t Connection::channelIdCounter;

Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true),
// AMQP version management change - kpvdr 2006-11-20
// TODO: Make this class version-aware and link these hard-wired numbers to that version
    version(8, 0)
{
    connector = new Connector(debug, _max_frame_size);
}

Connection::~Connection(){
    delete connector;
}

void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
    host = _host;
    port = _port;
    connector->setInputHandler(this);
    connector->setTimeoutHandler(this);
    connector->setShutdownHandler(this);
    out = connector->getOutputHandler();
    connector->connect(host, port);
    
    ProtocolInitiation* header = new ProtocolInitiation(8, 0);
    responses.expect();
    connector->init(header);
    responses.receive(method_bodies.connection_start);

    FieldTable props;
    string mechanism("PLAIN");
    string response = ((char)0) + uid + ((char)0) + pwd;
    string locale("en_US");
    responses.expect();
    out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));

    /**
     * Assume for now that further challenges will not be required
    //receive connection.secure
    responses.receive(connection_secure));
    //send connection.secure-ok
    out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
    **/

    responses.receive(method_bodies.connection_tune);

    ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
    out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));

    u_int16_t heartbeat = proposal->getHeartbeat();
    connector->setReadTimeout(heartbeat * 2);
    connector->setWriteTimeout(heartbeat);

    //send connection.open
    string capabilities;
    string vhost = virtualhost;
    responses.expect();
    out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
    //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
    responses.waitForResponse();
    if(responses.validate(method_bodies.connection_open_ok)){
        //ok
    }else if(responses.validate(method_bodies.connection_redirect)){
        //ignore for now
        ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
        std::cout << "Received redirection to " << redirect->getHost() << std::endl;
    }else{
        THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
    }
    
}

void Connection::close(){
    if(!closed){
        u_int16_t code(200);
        string text("Ok");
        u_int16_t classId(0);
        u_int16_t methodId(0);
        
        sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
        connector->close();
    }
}

void Connection::openChannel(Channel* channel){
    channel->con = this;
    channel->id = ++channelIdCounter;
    channel->out = out;
    channels[channel->id] = channel;
    //now send frame to open channel and wait for response
    string oob;
    channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
    channel->setQos();
    channel->closed = false;
}

void Connection::closeChannel(Channel* channel){
    //send frame to close channel
    u_int16_t code(200);
    string text("Ok");
    u_int16_t classId(0);
    u_int16_t methodId(0);
    closeChannel(channel, code, text, classId, methodId);
}

void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
    //send frame to close channel
    channel->cancelAll();
    channel->closed = true;
    channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
    channel->con = 0;
    channel->out = 0;
    removeChannel(channel);
}

void Connection::removeChannel(Channel* channel){
    //send frame to close channel

    channels.erase(channel->id);
    channel->out = 0;    
    channel->id = 0;
    channel->con = 0;
}

void Connection::received(AMQFrame* frame){
    u_int16_t channelId = frame->getChannel();

    if(channelId == 0){
        this->handleBody(frame->getBody());
    }else{
        Channel* channel = channels[channelId];
        if(channel == 0){
            error(504, "Unknown channel");
        }else{
            try{
                channel->handleBody(frame->getBody());
            }catch(qpid::QpidError e){
                channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
            }
        }
    }
}

void Connection::handleMethod(AMQMethodBody::shared_ptr body){
    //connection.close, basic.deliver, basic.return or a response to a synchronous request
    if(responses.isWaiting()){
        responses.signalResponse(body);
    }else if(method_bodies.connection_close.match(body.get())){
        //send back close ok
        //close socket
        ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
        std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
        connector->close();
    }else{
        std::cout << "Unhandled method for connection: " << *body << std::endl;
        error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
    }
}
    
void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
    error(504, "Channel error: received header body with channel 0.");
}
    
void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
    error(504, "Channel error: received content body with channel 0.");
}
    
void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}

void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
    responses.expect();
    out->send(frame);
    responses.receive(body);
}

void Connection::error(int code, const string& msg, int classid, int methodid){
    std::cout << "Connection exception generated: " << code << msg;
    if(classid || methodid){
        std::cout << " [" << methodid << ":" << classid << "]";
    }
    std::cout << std::endl;
    sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
    connector->close();
}

void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
    std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
    int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
    string msg = e.msg;
    if(method == 0){
        closeChannel(channel, code, msg);
    }else{
        closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
    }
}

void Connection::idleIn(){
    std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
    connector->close();
}

void Connection::idleOut(){
    out->send(new AMQFrame(0, new AMQHeartbeatBody()));
}

void Connection::shutdown(){
    closed = true;
    //close all channels
    for(iterator i = channels.begin(); i != channels.end(); i++){
        i->second->stop();
    }
}
