blob: 920c4937db63c2dae7770fa5fa9481a8a599cdde [file] [log] [blame]
#ifndef QPID_CLUSTER_CONNECTION_H
#define QPID_CLUSTER_CONNECTION_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "types.h"
#include "OutputInterceptor.h"
#include "McastFrameHandler.h"
#include "UpdateReceiver.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/FrameDecoder.h"
#include <iosfwd>
namespace qpid {
namespace framing { class AMQFrame; }
namespace broker {
class SemanticState;
struct QueuedMessage;
class TxBuffer;
class TxAccept;
}
namespace cluster {
class Cluster;
class Event;
struct EventFrame;
/** Intercept broker::Connection calls for shadow and local cluster connections. */
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
public framing::AMQP_AllOperations::ClusterConnectionHandler,
private broker::Connection::ErrorListener
{
public:
/** Local connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink,
const qpid::sys::SecuritySettings& external);
/** Shadow connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
const qpid::sys::SecuritySettings& external);
~Connection();
ConnectionId getId() const { return self; }
broker::Connection* getBrokerConnection() { return connection.get(); }
const broker::Connection* getBrokerConnection() const { return connection.get(); }
/** Local connections may be clients or catch-up connections */
bool isLocal() const;
bool isLocalClient() const { return isLocal() && !isCatchUp(); }
/** True for connections that are shadowing remote broker connections */
bool isShadow() const;
/** True if the connection is in "catch-up" mode: building initial broker state. */
bool isCatchUp() const { return catchUp; }
/** True if the connection is a completed shared update connection */
bool isUpdated() const;
Cluster& getCluster() { return cluster; }
// ConnectionInputHandler methods
void received(framing::AMQFrame&);
void closed();
bool doOutput();
void idleOut() { if (connection.get()) connection->idleOut(); }
void idleIn() { if (connection.get()) connection->idleIn(); }
// ConnectionCodec methods - called by IO layer with a read buffer.
size_t decode(const char* buffer, size_t size);
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position,
uint32_t usedMsgCredit, uint32_t usedByteCredit);
// ==== Used in catch-up mode to build initial state.
//
// State update methods.
void shadowPrepare(const std::string&);
void shadowSetUser(const std::string&);
void sessionState(const framing::SequenceNumber& replayStart,
const framing::SequenceNumber& sendCommandPoint,
const framing::SequenceSet& sentIncomplete,
const framing::SequenceNumber& expected,
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete,
bool dtxSelected);
void outputTask(uint16_t channel, const std::string& name);
void shadowReady(uint64_t memberId,
uint64_t connectionId,
const std::string& managementId,
const std::string& username,
const std::string& fragment,
uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&,
const framing::SequenceNumber& frameSeq);
void retractOffer();
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
const std::string& tag,
const framing::SequenceNumber& id,
bool acquired,
bool accepted,
bool cancelled,
bool completed,
bool ended,
bool windowing,
bool enqueued,
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void txStart();
void txAccept(const framing::SequenceSet&);
void txDequeue(const std::string&);
void txEnqueue(const std::string&);
void txPublish(const framing::Array&, bool);
void txEnd();
void accumulatedAck(const framing::SequenceSet&);
// Dtx state
void dtxStart(const std::string& xid,
bool ended,
bool suspended,
bool failed,
bool expired);
void dtxEnd();
void dtxAck();
void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
// Encoded exchange replication.
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid,
bool nodict, const std::string& username,
const std::string& initFrames);
void close();
void abort();
void deliverClose();
OutputInterceptor& getOutput() { return output; }
void addQueueListener(const std::string& queue, uint32_t listener);
void managementSetupState(uint64_t objectNum,
uint16_t bootSequence,
const framing::Uuid&,
const std::string& vendor,
const std::string& product,
const std::string& instance);
void config(const std::string& encoded);
void setSecureConnection ( broker::SecureConnection * sc );
void doCatchupIoCallbacks();
void clock(uint64_t time);
void queueDequeueSincePurgeState(const std::string&, uint32_t);
bool isAnnounced() const { return announced; }
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
};
// Arguments to construct a broker::Connection
struct ConnectionCtor {
sys::ConnectionOutputHandler* out;
broker::Broker& broker;
std::string mgmtId;
qpid::sys::SecuritySettings external;
bool isLink;
uint64_t objectId;
bool shadow;
bool delayManagement;
ConnectionCtor(
sys::ConnectionOutputHandler* out_,
broker::Broker& broker_,
const std::string& mgmtId_,
const qpid::sys::SecuritySettings& external_,
bool isLink_=false,
uint64_t objectId_=0,
bool shadow_=false,
bool delayManagement_=false
) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
isLink(isLink_), objectId(objectId_), shadow(shadow_),
delayManagement(delayManagement_)
{}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
new broker::Connection(
out, broker, mgmtId, external, isLink, objectId,
shadow, delayManagement)
);
}
};
static NullFrameHandler nullFrameHandler;
// Error listener functions
void connectionError(const std::string&);
void sessionError(uint16_t channel, const std::string&);
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverDoOutput(uint32_t limit);
bool checkProtocolHeader(const char*& data, size_t size);
void processInitialFrames(const char*& data, size_t size);
boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
void closeUpdated();
void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
Cluster& cluster;
ConnectionId self;
bool catchUp;
bool announced;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
ConnectionCtor connectionCtor;
std::auto_ptr<broker::Connection> connection;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
broker::DeliveryRecords dtxAckRecords;
broker::DtxWorkRecord* dtxCurrent;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
UpdateReceiver& updateIn;
qpid::broker::SecureConnection* secureConnection;
std::string initialFrames;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_CONNECTION_H*/