blob: cd369b9784de3d1ed84aaccafcd760d1d66afea1 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.federation;
import java.security.AccessControlContext;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.RemoteHost;
import org.apache.qpid.server.model.RemoteHostAddress;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
@ManagedObject( category = false, type = RemoteHost.REMOTE_HOST_TYPE )
class RemoteHostImpl extends AbstractConfiguredObject<RemoteHostImpl> implements RemoteHost<RemoteHostImpl>
{
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHostImpl.class);
private final VirtualHost<?> _virtualHost;
@ManagedAttributeField
private int _retryPeriod;
@ManagedAttributeField
private boolean _redirectFollowed;
@ManagedAttributeField
private Collection<String> _routableAddresses;
private final AccessControlContext _createConnectionContext;
private final CreateConnectionTask _createConnectionTask;
enum ConnectionState
{
DISCONNECTED,
CONNECTING,
CONNECTED,
STOPPED
}
private ConnectionState _connectionState = ConnectionState.STOPPED;
@ManagedObjectFactoryConstructor
public RemoteHostImpl(Map<String, Object> attributes, VirtualHost<?> virtualHost)
{
super(parentsMap(virtualHost), attributes);
_virtualHost = virtualHost;
_createConnectionContext =
getSystemTaskControllerContext("Create connection " + getName(), _virtualHost.getPrincipal());
_createConnectionTask = new CreateConnectionTask();
}
@Override
public int getRetryPeriod()
{
return _retryPeriod;
}
@Override
public boolean isRedirectFollowed()
{
return _redirectFollowed;
}
@Override
public Collection<String> getRoutableAddresses()
{
return _routableAddresses;
}
@StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED, State.STOPPED}, desiredState = State.ACTIVE)
private ListenableFuture<Void> onActivate()
{
setState(State.ACTIVE);
_failoverIterator = null;
if (_virtualHost.getState() == State.ACTIVE)
{
_createConnectionTask.scheduleNow();
}
else if (_virtualHost.getDesiredState() == State.ACTIVE)
{
_virtualHost.addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
{
if (newState == State.ACTIVE)
{
_createConnectionTask.scheduleNow();
_virtualHost.removeChangeListener(this);
}
else if (object.getDesiredState() != State.ACTIVE)
{
_virtualHost.removeChangeListener(this);
}
}
});
}
return Futures.immediateFuture(null);
}
private Iterator<RemoteHostAddress> _failoverIterator;
private void setConnectionState(ConnectionState connectionState)
{
_connectionState = connectionState;
}
private synchronized void makeConnection()
{
LOGGER.debug("makeConnection called with state: {}, connectionState: {}", getState(), _connectionState);
if(getState() == State.ACTIVE && !EnumSet.of(ConnectionState.CONNECTED, ConnectionState.CONNECTING).contains(_connectionState))
{
if (_failoverIterator == null || !_failoverIterator.hasNext())
{
_failoverIterator = getChildren(RemoteHostAddress.class).iterator();
}
if (_failoverIterator.hasNext())
{
RemoteHostAddress<?> address = _failoverIterator.next();
setConnectionState(ConnectionState.CONNECTING);
boolean connected = _virtualHost.makeConnection(address, new Action<Boolean>()
{
@Override
public void performAction(final Boolean wasConnected)
{
setConnectionState(ConnectionState.DISCONNECTED);
if (wasConnected)
{
_failoverIterator = null;
_createConnectionTask.scheduleNow();
}
else if (_failoverIterator.hasNext())
{
_createConnectionTask.scheduleNow();
}
else
{
_createConnectionTask.schedule(1000L * _retryPeriod);
}
}
});
if (connected)
{
setConnectionState(ConnectionState.CONNECTED);
}
else
{
setConnectionState(ConnectionState.DISCONNECTED);
if (_failoverIterator.hasNext())
{
_createConnectionTask.scheduleNow();
}
else
{
_createConnectionTask.schedule(1000L * _retryPeriod);
}
}
}
}
}
private class CreateConnectionTask extends HouseKeepingTask
{
private final AtomicBoolean _scheduled = new AtomicBoolean();
public CreateConnectionTask()
{
super("Create connection: " + RemoteHostImpl.this.getName(), _virtualHost, _createConnectionContext);
}
@Override
public void execute()
{
_scheduled.set(false);
makeConnection();
}
public void schedule(long delay)
{
if(_scheduled.compareAndSet(false, true))
{
_virtualHost.scheduleTask(delay, this);
}
}
public void scheduleNow()
{
schedule(0L);
}
}
}