blob: cce0110ea6f9259522406defd8eca838a98eaf53 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.net.URI;
import java.util.Hashtable;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import javax.naming.CommunicationException;
import javax.naming.Context;
import javax.naming.NamingEnumeration;
import javax.naming.directory.Attributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import javax.naming.directory.SearchControls;
import javax.naming.directory.SearchResult;
import javax.naming.event.EventDirContext;
import javax.naming.event.NamespaceChangeListener;
import javax.naming.event.NamingEvent;
import javax.naming.event.NamingExceptionEvent;
import javax.naming.event.ObjectChangeListener;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* class to create dynamic network connectors listed in an directory
* server using the LDAP v3 protocol as defined in RFC 2251, the
* entries listed in the directory server must implement the ipHost
* and ipService objectClasses as defined in RFC 2307.
*
* @author Trevor Pounds
* @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
* @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
*
* @org.apache.xbean.XBean element="ldapNetworkConnector"
*/
public class LdapNetworkConnector
extends NetworkConnector
implements NamespaceChangeListener,
ObjectChangeListener
{
private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
// force returned entries to implement the ipHost and ipService object classes (RFC 2307)
private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))";
// connection
private URI[] availableURIs = null;
private int availableURIsIndex = 0;
private String base = null;
private boolean failover = false;
private long curReconnectDelay = 1000; /* 1 sec */
private long maxReconnectDelay = 30000; /* 30 sec */
// authentication
private String user = null;
private String password = null;
private boolean anonymousAuthentication = false;
// search
private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
private boolean searchEventListener = false;
// connector management
private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap();
private Map<URI, Integer> referenceMap = new ConcurrentHashMap();
private Map<String, URI> uuidMap = new ConcurrentHashMap();
// local context
private DirContext context = null;
//currently in use URI
private URI ldapURI = null;
/**
* returns the next URI from the configured list
*
* @return random URI from the configured list
*/
public URI getUri()
{ return availableURIs[++availableURIsIndex % availableURIs.length]; }
/**
* sets the LDAP server URI
*
* @param _uri LDAP server URI
*/
public void setUri(URI _uri)
throws Exception
{
CompositeData data = URISupport.parseComposite(_uri);
if(data.getScheme().equals("failover"))
{
availableURIs = data.getComponents();
failover = true;
}
else
{ availableURIs = new URI[]{ _uri }; }
}
/**
* sets the base LDAP dn used for lookup operations
*
* @param _base LDAP base dn
*/
public void setBase(String _base)
{ base = _base; }
/**
* sets the LDAP user for access credentials
*
* @param _user LDAP dn of user
*/
public void setUser(String _user)
{ user = _user; }
/**
* sets the LDAP password for access credentials
*
* @param _password user password
*/
public void setPassword(String _password)
{ password = _password; }
/**
* sets LDAP anonymous authentication access credentials
*
* @param _anonymousAuthentication set to true to use anonymous authentication
*/
public void setAnonymousAuthentication(boolean _anonymousAuthentication)
{ anonymousAuthentication = _anonymousAuthentication; }
/**
* sets the LDAP search scope
*
* @param _searchScope LDAP JNDI search scope
*/
public void setSearchScope(String _searchScope)
throws Exception
{
int scope;
if(_searchScope.equals("OBJECT_SCOPE"))
{ scope = SearchControls.OBJECT_SCOPE; }
else if(_searchScope.equals("ONELEVEL_SCOPE"))
{ scope = SearchControls.ONELEVEL_SCOPE; }
else if(_searchScope.equals("SUBTREE_SCOPE"))
{ scope = SearchControls.SUBTREE_SCOPE; }
else
{ throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); }
searchControls.setSearchScope(scope);
}
/**
* sets the LDAP search filter as defined in RFC 2254
*
* @param _searchFilter LDAP search filter
* @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
*/
public void setSearchFilter(String _searchFilter)
{ searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; }
/**
* enables/disable a persistent search to the LDAP server as defined
* in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
*
* @param _searchEventListener enable = true, disable = false (default)
* @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
*/
public void setSearchEventListener(boolean _searchEventListener)
{ searchEventListener = _searchEventListener; }
/**
* start the connector
*/
public void start()
throws Exception
{
LOG.info("connecting...");
Hashtable<String, String> env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
this.ldapURI = getUri();
LOG.debug(" URI [" + this.ldapURI + "]");
env.put(Context.PROVIDER_URL, this.ldapURI.toString());
if(anonymousAuthentication)
{
LOG.debug(" login credentials [anonymous]");
env.put(Context.SECURITY_AUTHENTICATION, "none");
}
else
{
LOG.debug(" login credentials [" + user + ":******]");
env.put(Context.SECURITY_PRINCIPAL, user);
env.put(Context.SECURITY_CREDENTIALS, password);
}
boolean isConnected = false;
while(!isConnected)
{
try
{
context = new InitialDirContext(env);
isConnected = true;
}
catch(CommunicationException err)
{
if(failover)
{
this.ldapURI = getUri();
LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
env.put(Context.PROVIDER_URL, this.ldapURI.toString());
Thread.sleep(curReconnectDelay);
curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
}
else
{ throw err; }
}
}
// add connectors from search results
LOG.info("searching for network connectors...");
LOG.debug(" base [" + base + "]");
LOG.debug(" filter [" + searchFilter + "]");
LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
while(results.hasMore())
{ addConnector(results.next()); }
// register persistent search event listener
if(searchEventListener)
{
LOG.info("registering persistent search listener...");
EventDirContext eventContext = (EventDirContext)context.lookup("");
eventContext.addNamingListener(base, searchFilter, searchControls, this);
}
else // otherwise close context (i.e. connection as it is no longer needed)
{ context.close(); }
}
/**
* stop the connector
*/
public void stop()
throws Exception
{
LOG.info("stopping context...");
for(NetworkConnector connector : connectorMap.values())
{ connector.stop(); }
connectorMap.clear();
referenceMap.clear();
uuidMap.clear();
context.close();
}
public String toString() {
return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
}
/**
* add connector of the given URI
*
* @param result
* search result of connector to add
*/
protected synchronized void addConnector(SearchResult result)
throws Exception
{
String uuid = toUUID(result);
if(uuidMap.containsKey(uuid))
{
LOG.warn("connector already regsitered for UUID [" + uuid + "]");
return;
}
URI connectorURI = toURI(result);
if(connectorMap.containsKey(connectorURI))
{
int referenceCount = referenceMap.get(connectorURI) + 1;
LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
referenceMap.put(connectorURI, referenceCount);
uuidMap.put(uuid, connectorURI);
return;
}
// FIXME: disable JMX listing of LDAP managed connectors, we will
// want to map/manage these differently in the future
// boolean useJMX = getBrokerService().isUseJmx();
// getBrokerService().setUseJmx(false);
NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
// getBrokerService().setUseJmx(useJMX);
// propogate std connector properties that may have been set via XML
connector.setDynamicOnly(isDynamicOnly());
connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
connector.setNetworkTTL(getNetworkTTL());
connector.setConduitSubscriptions(isConduitSubscriptions());
connector.setExcludedDestinations(getExcludedDestinations());
connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
connector.setDuplex(isDuplex());
// XXX: set in the BrokerService.startAllConnectors method and is
// required to prevent remote broker exceptions upon connection
connector.setLocalUri(getBrokerService().getVmConnectorURI());
connector.setBrokerName(getBrokerService().getBrokerName());
connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
// start network connector
connectorMap.put(connectorURI, connector);
referenceMap.put(connectorURI, 1);
uuidMap.put(uuid, connectorURI);
connector.start();
LOG.info("connector added with URI [" + connectorURI + "]");
}
/**
* remove connector of the given URI
*
* @param result search result of connector to remove
*/
protected synchronized void removeConnector(SearchResult result)
throws Exception
{
String uuid = toUUID(result);
if(!uuidMap.containsKey(uuid))
{
LOG.warn("connector not regsitered for UUID [" + uuid + "]");
return;
}
URI connectorURI = uuidMap.get(uuid);
if(!connectorMap.containsKey(connectorURI))
{
LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
return;
}
int referenceCount = referenceMap.get(connectorURI) - 1;
referenceMap.put(connectorURI, referenceCount);
uuidMap.remove(uuid);
LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
if(referenceCount > 0)
{ return; }
NetworkConnector connector = connectorMap.remove(connectorURI);
connector.stop();
LOG.info("connector removed with URI [" + connectorURI + "]");
}
/**
* convert search result into URI
*
* @param result search result to convert to URI
*/
protected URI toURI(SearchResult result)
throws Exception
{
Attributes attributes = result.getAttributes();
String address = (String)attributes.get("iphostnumber").get();
String port = (String)attributes.get("ipserviceport").get();
String protocol = (String)attributes.get("ipserviceprotocol").get();
URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
return connectorURI;
}
/**
* convert search result into URI
*
* @param result search result to convert to URI
*/
protected String toUUID(SearchResult result)
{
String uuid = result.getNameInNamespace();
LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
return uuid;
}
/**
* invoked when an entry has been added during a persistent search
*/
public void objectAdded(NamingEvent event)
{
LOG.debug("entry added");
try
{ addConnector((SearchResult)event.getNewBinding()); }
catch(Exception err)
{ LOG.error("ERR: caught unexpected exception", err); }
}
/**
* invoked when an entry has been removed during a persistent search
*/
public void objectRemoved(NamingEvent event)
{
LOG.debug("entry removed");
try
{ removeConnector((SearchResult)event.getOldBinding()); }
catch(Exception err)
{ LOG.error("ERR: caught unexpected exception", err); }
}
/**
* invoked when an entry has been renamed during a persistent search
*/
public void objectRenamed(NamingEvent event)
{
LOG.debug("entry renamed");
// XXX: getNameInNamespace method does not seem to work properly,
// but getName seems to provide the result we want
String uuidOld = event.getOldBinding().getName();
String uuidNew = event.getNewBinding().getName();
URI connectorURI = uuidMap.remove(uuidOld);
uuidMap.put(uuidNew, connectorURI);
LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
}
/**
* invoked when an entry has been changed during a persistent search
*/
public void objectChanged(NamingEvent event)
{
LOG.debug("entry changed");
try
{
SearchResult result = (SearchResult)event.getNewBinding();
removeConnector(result);
addConnector(result);
}
catch(Exception err)
{ LOG.error("ERR: caught unexpected exception", err); }
}
/**
* invoked when an exception has occurred during a persistent search
*/
public void namingExceptionThrown(NamingExceptionEvent event)
{ LOG.error("ERR: caught unexpected exception", event.getException()); }
}