blob: acecbbf17b389560352ef02a439a89e52682bef3 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.engine.impl;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.ProtonJConnection;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.reactor.Reactor;
public class ConnectionImpl extends EndpointImpl implements ProtonJConnection
{
public static final int MAX_CHANNELS = 65535;
private List<SessionImpl> _sessions = new ArrayList<SessionImpl>();
private EndpointImpl _transportTail;
private EndpointImpl _transportHead;
private int _maxChannels = MAX_CHANNELS;
private LinkNode<SessionImpl> _sessionHead;
private LinkNode<SessionImpl> _sessionTail;
private LinkNode<LinkImpl> _linkHead;
private LinkNode<LinkImpl> _linkTail;
private DeliveryImpl _workHead;
private DeliveryImpl _workTail;
private TransportImpl _transport;
private DeliveryImpl _transportWorkHead;
private DeliveryImpl _transportWorkTail;
private int _transportWorkSize = 0;
private String _localContainerId = "";
private String _localHostname;
private String _remoteContainer;
private String _remoteHostname;
private Symbol[] _offeredCapabilities;
private Symbol[] _desiredCapabilities;
private Symbol[] _remoteOfferedCapabilities;
private Symbol[] _remoteDesiredCapabilities;
private Map<Symbol, Object> _properties;
private Map<Symbol, Object> _remoteProperties;
private Object _context;
private CollectorImpl _collector;
private Reactor _reactor;
private static final Symbol[] EMPTY_SYMBOL_ARRAY = new Symbol[0];
/**
* Application code should use {@link org.apache.qpid.proton.engine.Connection.Factory#create()} instead.
*/
public ConnectionImpl()
{
}
@Override
public SessionImpl session()
{
SessionImpl session = new SessionImpl(this);
_sessions.add(session);
return session;
}
void freeSession(SessionImpl session)
{
_sessions.remove(session);
}
protected LinkNode<SessionImpl> addSessionEndpoint(SessionImpl endpoint)
{
LinkNode<SessionImpl> node;
if(_sessionHead == null)
{
node = _sessionHead = _sessionTail = LinkNode.newList(endpoint);
}
else
{
node = _sessionTail = _sessionTail.addAtTail(endpoint);
}
return node;
}
void removeSessionEndpoint(LinkNode<SessionImpl> node)
{
LinkNode<SessionImpl> prev = node.getPrev();
LinkNode<SessionImpl> next = node.getNext();
if(_sessionHead == node)
{
_sessionHead = next;
}
if(_sessionTail == node)
{
_sessionTail = prev;
}
node.remove();
}
protected LinkNode<LinkImpl> addLinkEndpoint(LinkImpl endpoint)
{
LinkNode<LinkImpl> node;
if(_linkHead == null)
{
node = _linkHead = _linkTail = LinkNode.newList(endpoint);
}
else
{
node = _linkTail = _linkTail.addAtTail(endpoint);
}
return node;
}
void removeLinkEndpoint(LinkNode<LinkImpl> node)
{
LinkNode<LinkImpl> prev = node.getPrev();
LinkNode<LinkImpl> next = node.getNext();
if(_linkHead == node)
{
_linkHead = next;
}
if(_linkTail == node)
{
_linkTail = prev;
}
node.remove();
}
@Override
public Session sessionHead(final EnumSet<EndpointState> local, final EnumSet<EndpointState> remote)
{
if(_sessionHead == null)
{
return null;
}
else
{
LinkNode.Query<SessionImpl> query = new EndpointImplQuery<SessionImpl>(local, remote);
LinkNode<SessionImpl> node = query.matches(_sessionHead) ? _sessionHead : _sessionHead.next(query);
return node == null ? null : node.getValue();
}
}
@Override
public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
if(_linkHead == null)
{
return null;
}
else
{
LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote);
LinkNode<LinkImpl> node = query.matches(_linkHead) ? _linkHead : _linkHead.next(query);
return node == null ? null : node.getValue();
}
}
@Override
protected ConnectionImpl getConnectionImpl()
{
return this;
}
@Override
void postFinal() {
put(Event.Type.CONNECTION_FINAL, this);
}
@Override
void doFree() {
List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions);
for(Session session : sessions) {
session.free();
}
_sessions = null;
}
void modifyEndpoints() {
if (_sessions != null) {
for (SessionImpl ssn: _sessions) {
ssn.modifyEndpoints();
}
}
if (!freed) {
modified();
}
}
void handleOpen(Open open)
{
// TODO - store state
setRemoteState(EndpointState.ACTIVE);
setRemoteHostname(open.getHostname());
setRemoteContainer(open.getContainerId());
setRemoteDesiredCapabilities(open.getDesiredCapabilities());
setRemoteOfferedCapabilities(open.getOfferedCapabilities());
setRemoteProperties(open.getProperties());
put(Event.Type.CONNECTION_REMOTE_OPEN, this);
}
EndpointImpl getTransportHead()
{
return _transportHead;
}
EndpointImpl getTransportTail()
{
return _transportTail;
}
void addModified(EndpointImpl endpoint)
{
if(_transportTail == null)
{
endpoint.setTransportNext(null);
endpoint.setTransportPrev(null);
_transportHead = _transportTail = endpoint;
}
else
{
_transportTail.setTransportNext(endpoint);
endpoint.setTransportPrev(_transportTail);
_transportTail = endpoint;
_transportTail.setTransportNext(null);
}
}
void removeModified(EndpointImpl endpoint)
{
if(_transportHead == endpoint)
{
_transportHead = endpoint.transportNext();
}
else
{
endpoint.transportPrev().setTransportNext(endpoint.transportNext());
}
if(_transportTail == endpoint)
{
_transportTail = endpoint.transportPrev();
}
else
{
endpoint.transportNext().setTransportPrev(endpoint.transportPrev());
}
}
@Override
public int getMaxChannels()
{
return _maxChannels;
}
public String getLocalContainerId()
{
return _localContainerId;
}
@Override
public void setLocalContainerId(String localContainerId)
{
_localContainerId = localContainerId;
}
@Override
public DeliveryImpl getWorkHead()
{
return _workHead;
}
@Override
public void setContainer(String container)
{
_localContainerId = container;
}
@Override
public String getContainer()
{
return _localContainerId;
}
@Override
public void setHostname(String hostname)
{
_localHostname = hostname;
}
@Override
public String getRemoteContainer()
{
return _remoteContainer;
}
@Override
public String getRemoteHostname()
{
return _remoteHostname;
}
@Override
public void setOfferedCapabilities(Symbol[] capabilities)
{
_offeredCapabilities = capabilities;
}
@Override
public void setDesiredCapabilities(Symbol[] capabilities)
{
_desiredCapabilities = capabilities;
}
@Override
public Symbol[] getRemoteOfferedCapabilities()
{
return _remoteOfferedCapabilities == null ? EMPTY_SYMBOL_ARRAY : _remoteOfferedCapabilities;
}
@Override
public Symbol[] getRemoteDesiredCapabilities()
{
return _remoteDesiredCapabilities == null ? EMPTY_SYMBOL_ARRAY : _remoteDesiredCapabilities;
}
Symbol[] getOfferedCapabilities()
{
return _offeredCapabilities;
}
Symbol[] getDesiredCapabilities()
{
return _desiredCapabilities;
}
void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
{
_remoteOfferedCapabilities = remoteOfferedCapabilities;
}
void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
{
_remoteDesiredCapabilities = remoteDesiredCapabilities;
}
Map<Symbol, Object> getProperties()
{
return _properties;
}
@Override
public void setProperties(Map<Symbol, Object> properties)
{
_properties = properties;
}
@Override
public Map<Symbol, Object> getRemoteProperties()
{
return _remoteProperties;
}
void setRemoteProperties(Map<Symbol, Object> remoteProperties)
{
_remoteProperties = remoteProperties;
}
@Override
public String getHostname()
{
return _localHostname;
}
void setRemoteContainer(String remoteContainerId)
{
_remoteContainer = remoteContainerId;
}
void setRemoteHostname(String remoteHostname)
{
_remoteHostname = remoteHostname;
}
DeliveryImpl getWorkTail()
{
return _workTail;
}
void removeWork(DeliveryImpl delivery)
{
if (!delivery._work) return;
DeliveryImpl next = delivery.getWorkNext();
DeliveryImpl prev = delivery.getWorkPrev();
if (prev != null) {
prev.setWorkNext(next);
}
if (next != null) {
next.setWorkPrev(prev);
}
delivery.setWorkNext(null);
delivery.setWorkPrev(null);
if(_workHead == delivery)
{
_workHead = next;
}
if(_workTail == delivery)
{
_workTail = prev;
}
delivery._work = false;
}
void addWork(DeliveryImpl delivery)
{
if (delivery._work) return;
delivery.setWorkNext(null);
delivery.setWorkPrev(_workTail);
if (_workTail != null) {
_workTail.setWorkNext(delivery);
}
_workTail = delivery;
if (_workHead == null) {
_workHead = delivery;
}
delivery._work = true;
}
public Iterator<DeliveryImpl> getWorkSequence()
{
return new WorkSequence(_workHead);
}
void setTransport(TransportImpl transport)
{
_transport = transport;
}
@Override
public TransportImpl getTransport()
{
return _transport;
}
private static class WorkSequence implements Iterator<DeliveryImpl>
{
private DeliveryImpl _next;
public WorkSequence(DeliveryImpl workHead)
{
_next = workHead;
}
@Override
public boolean hasNext()
{
return _next != null;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public DeliveryImpl next()
{
DeliveryImpl next = _next;
if(next != null)
{
_next = next.getWorkNext();
}
return next;
}
}
DeliveryImpl getTransportWorkHead()
{
return _transportWorkHead;
}
int getTransportWorkSize() {
return _transportWorkSize;
}
public void removeTransportWork(DeliveryImpl delivery)
{
if (!delivery._transportWork) return;
DeliveryImpl next = delivery.getTransportWorkNext();
DeliveryImpl prev = delivery.getTransportWorkPrev();
if (prev != null) {
prev.setTransportWorkNext(next);
}
if (next != null) {
next.setTransportWorkPrev(prev);
}
delivery.setTransportWorkNext(null);
delivery.setTransportWorkPrev(null);
if(_transportWorkHead == delivery)
{
_transportWorkHead = next;
}
if(_transportWorkTail == delivery)
{
_transportWorkTail = prev;
}
delivery._transportWork = false;
_transportWorkSize--;
}
void addTransportWork(DeliveryImpl delivery)
{
modified();
if (delivery._transportWork) return;
delivery.setTransportWorkNext(null);
delivery.setTransportWorkPrev(_transportWorkTail);
if (_transportWorkTail != null) {
_transportWorkTail.setTransportWorkNext(delivery);
}
_transportWorkTail = delivery;
if (_transportWorkHead == null) {
_transportWorkHead = delivery;
}
delivery._transportWork = true;
_transportWorkSize++;
}
void workUpdate(DeliveryImpl delivery)
{
if(delivery != null)
{
if(!delivery.isSettled() &&
(delivery.isReadable() ||
delivery.isWritable() ||
delivery.isUpdated()))
{
addWork(delivery);
}
else
{
removeWork(delivery);
}
}
}
@Override
public Object getContext()
{
return _context;
}
@Override
public void setContext(Object context)
{
_context = context;
}
@Override
public void collect(Collector collector)
{
_collector = (CollectorImpl) collector;
put(Event.Type.CONNECTION_INIT, this);
LinkNode<SessionImpl> ssn = _sessionHead;
while (ssn != null) {
put(Event.Type.SESSION_INIT, ssn.getValue());
ssn = ssn.getNext();
}
LinkNode<LinkImpl> lnk = _linkHead;
while (lnk != null) {
put(Event.Type.LINK_INIT, lnk.getValue());
lnk = lnk.getNext();
}
}
EventImpl put(Event.Type type, Object context)
{
if (_collector != null) {
return _collector.put(type, context);
} else {
return null;
}
}
@Override
void localOpen()
{
put(Event.Type.CONNECTION_LOCAL_OPEN, this);
}
@Override
void localClose()
{
put(Event.Type.CONNECTION_LOCAL_CLOSE, this);
}
@Override
public Reactor getReactor() {
return _reactor;
}
public void setReactor(Reactor reactor) {
_reactor = reactor;
}
}