blob: 9c17aefcbc2d93d80b345d46097bffc6d3751d31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.directory.mitosis.service;
import org.apache.directory.mitosis.common.CSN;
import org.apache.directory.mitosis.common.Constants;
import org.apache.directory.mitosis.common.DefaultCSN;
import org.apache.directory.mitosis.common.ReplicaId;
import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
import org.apache.directory.mitosis.operation.Operation;
import org.apache.directory.mitosis.operation.OperationFactory;
import org.apache.directory.mitosis.service.protocol.codec.ReplicationServerProtocolCodecFactory;
import org.apache.directory.mitosis.service.protocol.handler.ReplicationClientContextHandler;
import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerContextHandler;
import org.apache.directory.mitosis.service.protocol.handler.ReplicationServerProtocolHandler;
import org.apache.directory.mitosis.store.ReplicationStore;
import org.apache.directory.server.core.DirectoryService;
import org.apache.directory.server.core.entry.ServerEntry;
import org.apache.directory.server.core.entry.ServerSearchResult;
import org.apache.directory.server.core.enumeration.SearchResultFilteringEnumeration;
import org.apache.directory.server.core.interceptor.BaseInterceptor;
import org.apache.directory.server.core.interceptor.Interceptor;
import org.apache.directory.server.core.interceptor.NextInterceptor;
import org.apache.directory.server.core.interceptor.context.AddOperationContext;
import org.apache.directory.server.core.interceptor.context.DeleteOperationContext;
import org.apache.directory.server.core.interceptor.context.EntryOperationContext;
import org.apache.directory.server.core.interceptor.context.GetMatchedNameOperationContext;
import org.apache.directory.server.core.interceptor.context.ListOperationContext;
import org.apache.directory.server.core.interceptor.context.LookupOperationContext;
import org.apache.directory.server.core.interceptor.context.ModifyOperationContext;
import org.apache.directory.server.core.interceptor.context.MoveAndRenameOperationContext;
import org.apache.directory.server.core.interceptor.context.MoveOperationContext;
import org.apache.directory.server.core.interceptor.context.RenameOperationContext;
import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
import org.apache.directory.server.core.invocation.InvocationStack;
import org.apache.directory.server.core.partition.PartitionNexus;
import org.apache.directory.server.schema.registries.AttributeTypeRegistry;
import org.apache.directory.server.schema.registries.Registries;
import org.apache.directory.shared.ldap.constants.SchemaConstants;
import org.apache.directory.shared.ldap.entry.EntryAttribute;
import org.apache.directory.shared.ldap.entry.Value;
import org.apache.directory.shared.ldap.exception.LdapNameNotFoundException;
import org.apache.directory.shared.ldap.filter.ExprNode;
import org.apache.directory.shared.ldap.filter.FilterParser;
import org.apache.directory.shared.ldap.filter.PresenceNode;
import org.apache.directory.shared.ldap.message.AliasDerefMode;
import org.apache.directory.shared.ldap.name.LdapDN;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.NameNotFoundException;
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
import javax.naming.directory.SearchControls;
import java.net.InetSocketAddress;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
/**
* An {@link Interceptor} that intercepts LDAP operations and propagates the
* changes occurred by the operations into other {@link ReplicaId}s so the DIT
* of each {@link ReplicaId} in the cluster has the same content without any
* conflict.
* <p>
* Once an operation is invoked, this interceptor transforms it into one or
* more operations that makes the requested operation more proper and robust
* for replication. The transformation process is actually just calling a
* respective factory method in {@link OperationFactory}. The methods in
* {@link OperationFactory} returns a new {@link Operation} instance.
* <p>
* The newly created {@link Operation} is used for three purposes.
* <ul>
* <li>To perform the requested operation to the local {@link PartitionNexus}
* <li>To store the created {@link Operation} itself to
* {@link ReplicationStore} so that it can be retrieved later by
* {@link ReplicationLogCleanJob} and {@link ReplicationClientContextHandler}
* <li>To transfer itself to other {@link ReplicaId}s via TCP/IP communication
* between {@link ReplicationClientContextHandler} and
* {@link ReplicationServerContextHandler}
* </ul>
* The first two actions (modifying the local DIT and storing the
* {@link Operation} to {@link ReplicationStore}) are performed automatically
* when
* {@link Operation#execute(PartitionNexus, ReplicationStore, Registries)}
* method is invoked. {@link ReplicationInterceptor} always call it instead of
* forwarding the requested operation to the next {@link Interceptor}.
* <p>
* The last action takes place by {@link ReplicationClientContextHandler},
* which handles TCP/IP connection managed by {@link ClientConnectionManager}.
* <p>
* There are two special attributes in the entries to be replicated:
* <ul>
* <li><tt>entryCSN</tt> - stores {@link CSN} of the entry. This attribute is
* used to compare the incoming operation from other replica is still
* valid. If the local <tt>entryCSN</tt> value is bigger then that of the
* incoming operation, it means conflict, and therefore an appropriate
* conflict resolution mechanism should get engaged.</li>
* <li><tt>entryDeleted</tt> - is <tt>TRUE</tt> if and only if the entry is
* deleted. The entry is not deleted immediately by a delete operation
* because <tt>entryCSN</tt> attribute should be retained for certain
* amount of time to determine whether the incoming change LOG, which
* affects an entry with the same DN, is a conflict (modification on a
* deleted entry) or not (creation of a new entry). You can purge old
* deleted entries and related change logs in {@link ReplicationStore} by
* calling {@link #purgeAgedData()}, or they will be purged automatically
* by periodic manner as you configured with {@link ReplicationConfiguration}.
* by calling {@link ReplicationConfiguration#setLogMaxAge(int)}.
* Because of this attribute, <tt>lookup</tt> and <tt>search</tt>
* operations are overrided to ignore entries with <tt>entryDeleted</tt>
* set to <tt>TRUE</tt>.</li>
* </ul>
*
* @org.apache.xbean.XBean
*
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
*/
public class ReplicationInterceptor extends BaseInterceptor
{
private static final Logger LOG = LoggerFactory.getLogger( ReplicationInterceptor.class );
/** The service name */
public static final String NAME = "replicationService";
private static final String ENTRY_CSN_OID = "1.3.6.1.4.1.18060.0.4.1.2.30";
private static final String ENTRY_DELETED_OID = "1.3.6.1.4.1.18060.0.4.1.2.31";
/**
* default name is the service name?
*/
private String name = NAME;
private DirectoryService directoryService;
private ReplicationConfiguration configuration;
private PartitionNexus nexus;
private OperationFactory operationFactory;
private ReplicationStore store;
private IoAcceptor registry;
private final ClientConnectionManager clientConnectionManager = new ClientConnectionManager( this );
private Registries registries;
public ReplicationInterceptor()
{
}
/**
* this interceptor has configuration so it might be useful to allow several instances in a chain.
* @return configured name for this interceptor.
*/
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ReplicationConfiguration getConfiguration()
{
return configuration;
}
public void setConfiguration(ReplicationConfiguration configuration) {
this.configuration = configuration;
}
public void init( DirectoryService directoryService ) throws NamingException
{
configuration.validate();
// and then preserve frequently used ones
this.directoryService = directoryService;
registries = directoryService.getRegistries();
nexus = directoryService.getPartitionNexus();
store = configuration.getStore();
operationFactory = new OperationFactory( directoryService, configuration );
// Initialize store and service
store.open( directoryService, configuration );
boolean serviceStarted = false;
try
{
startNetworking();
serviceStarted = true;
}
catch ( Exception e )
{
throw new ReplicationServiceException( "Failed to initialize MINA ServiceRegistry.", e );
}
finally
{
if ( !serviceStarted )
{
// roll back
store.close();
}
}
purgeAgedData();
}
private void startNetworking() throws Exception
{
registry = new SocketAcceptor();
SocketAcceptorConfig config = new SocketAcceptorConfig();
config.setReuseAddress( true );
config.getFilterChain().addLast( "protocol",
new ProtocolCodecFilter( new ReplicationServerProtocolCodecFactory() ) );
config.getFilterChain().addLast( "logger", new LoggingFilter() );
// bind server protocol provider
registry.bind( new InetSocketAddress( configuration.getServerPort() ), new ReplicationServerProtocolHandler(
this ), config );
clientConnectionManager.start( configuration );
}
public void destroy()
{
stopNetworking();
store.close();
}
private void stopNetworking()
{
// close all open connections, deactivate all filters and service registry
try
{
clientConnectionManager.stop();
}
catch ( Exception e )
{
LOG.error( "[Replica-{}] Failed to stop the client connection manager.", configuration.getReplicaId() );
LOG.error( "Stop failure exception: ", e );
}
registry.unbindAll();
}
/**
* Forces this context to send replication data to the peer replica immediately.
*/
public void replicate()
{
LOG.info( "[Replica-{}] Forcing replication...", configuration.getReplicaId() );
this.clientConnectionManager.replicate();
}
/**
* Wake the sleeping (unconnected) replicas.
*/
public void interruptConnectors()
{
LOG.info( "[Replica-{}] Waking sleeping replicas...", configuration.getReplicaId() );
this.clientConnectionManager.interruptConnectors();
}
/**
* Purges old replication logs and the old entries marked as 'deleted'
* (i.e. {@link Constants#ENTRY_DELETED} is <tt>TRUE</tt>). This method
* should be called periodically to make sure the size of the DIT and
* {@link ReplicationStore} increase limitlessly.
*
* @see ReplicationConfiguration#setLogMaxAge(int)
* @see ReplicationLogCleanJob
* @throws javax.naming.NamingException on error
*/
public void purgeAgedData() throws NamingException
{
ServerEntry rootDSE = nexus.getRootDSE( null );
EntryAttribute namingContextsAttr = rootDSE.get( SchemaConstants.NAMING_CONTEXTS_AT );
if ( ( namingContextsAttr == null ) || ( namingContextsAttr.size() == 0 ) )
{
throw new NamingException( "No namingContexts attributes in rootDSE." );
}
CSN purgeCSN = new DefaultCSN( System.currentTimeMillis() - configuration.getLogMaxAge() * 1000L * 60L * 60L
* 24L, // convert days to millis
new ReplicaId( "ZZZZZZZZZZZZZZZZ" ), Integer.MAX_VALUE );
ExprNode filter;
try
{
filter = FilterParser.parse( "(&(" + ENTRY_CSN_OID + "<=" + purgeCSN.toOctetString() + ")(" + ENTRY_DELETED_OID
+ "=TRUE))" );
}
catch ( ParseException e )
{
throw ( NamingException ) new NamingException().initCause( e );
}
// Iterate all context partitions to send all entries of them.
for ( Value<?> namingContext:namingContextsAttr )
{
// Convert attribute value to JNDI name.
LdapDN contextName;
contextName = new LdapDN( (String)namingContext.get() );
contextName.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
LOG.info( "[Replica-{}] Purging aged data under '{}'", configuration.getReplicaId(), contextName );
purgeAgedData( contextName, filter );
}
store.removeLogs( purgeCSN, false );
}
private void purgeAgedData( LdapDN contextName, ExprNode filter ) throws NamingException
{
SearchControls ctrl = new SearchControls();
ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
ctrl.setReturningAttributes( new String[] { "entryCSN", "entryDeleted" } );
NamingEnumeration<ServerSearchResult> e = nexus.search(
new SearchOperationContext( registries, contextName, AliasDerefMode.DEREF_ALWAYS, filter, ctrl ) );
List<LdapDN> names = new ArrayList<LdapDN>();
try
{
while ( e.hasMore() )
{
ServerSearchResult sr = e.next();
LdapDN name = sr.getDn();
if ( name.size() > contextName.size() )
{
names.add( name );
}
}
}
finally
{
e.close();
}
for ( LdapDN name : names )
{
try
{
name.normalize( registries.getAttributeTypeRegistry().getNormalizerMapping() );
ServerEntry entry = nexus.lookup( new LookupOperationContext( registries, name ) );
LOG.info( "[Replica-{}] Purge: " + name + " (" + entry + ')', configuration.getReplicaId() );
nexus.delete( new DeleteOperationContext( registries, name ) );
}
catch ( NamingException ex )
{
LOG.error( "[Replica-{}] Failed to fetch/delete: " + name, configuration.getReplicaId(), ex );
}
}
}
public void add( NextInterceptor nextInterceptor, AddOperationContext addContext ) throws NamingException
{
Operation op = operationFactory.newAdd(
addContext.getDn(), addContext.getEntry() );
op.execute( nexus, store, registries );
}
@Override
public void delete( NextInterceptor next, DeleteOperationContext opContext ) throws NamingException
{
Operation op = operationFactory.newDelete( opContext.getDn() );
op.execute( nexus, store, registries );
}
public void modify( NextInterceptor next, ModifyOperationContext modifyContext ) throws NamingException
{
Operation op = operationFactory.newModify( modifyContext );
op.execute( nexus, store, registries );
}
@Override
public void move( NextInterceptor next, MoveOperationContext moveOpContext ) throws NamingException
{
Operation op = operationFactory.newMove( moveOpContext.getDn(), moveOpContext.getParent() );
op.execute( nexus, store, registries );
}
@Override
public void moveAndRename( NextInterceptor next, MoveAndRenameOperationContext moveAndRenameOpContext ) throws NamingException
{
Operation op = operationFactory.newMove( moveAndRenameOpContext.getDn(),
moveAndRenameOpContext.getParent(), moveAndRenameOpContext.getNewRdn(),
moveAndRenameOpContext.getDelOldDn() );
op.execute( nexus, store, registries );
}
@Override
public void rename( NextInterceptor next, RenameOperationContext renameOpContext ) throws NamingException
{
Operation op = operationFactory.newModifyRn( renameOpContext.getDn(), renameOpContext.getNewRdn(), renameOpContext.getDelOldDn() );
op.execute( nexus, store, registries );
}
public boolean hasEntry( NextInterceptor nextInterceptor, EntryOperationContext entryContext ) throws NamingException
{
// Ask others first.
boolean hasEntry = nextInterceptor.hasEntry( entryContext );
// If the entry exists,
if ( hasEntry )
{
// Check DELETED attribute.
try
{
ServerEntry entry = nextInterceptor.lookup( new LookupOperationContext( registries, entryContext.getDn() ) );
hasEntry = !isDeleted( entry );
}
catch ( NameNotFoundException e )
{
hasEntry = false;
}
}
return hasEntry;
}
public ServerEntry lookup( NextInterceptor nextInterceptor, LookupOperationContext lookupContext ) throws NamingException
{
if ( lookupContext.getAttrsId() != null )
{
boolean found = false;
String[] attrIds = lookupContext.getAttrsIdArray();
// Look for 'entryDeleted' attribute is in attrIds.
for ( String attrId:attrIds )
{
if ( Constants.ENTRY_DELETED.equals( attrId ) )
{
found = true;
break;
}
}
// If not exists, add one.
if ( !found )
{
String[] newAttrIds = new String[attrIds.length + 1];
System.arraycopy( attrIds, 0, newAttrIds, 0, attrIds.length );
newAttrIds[attrIds.length] = Constants.ENTRY_DELETED;
lookupContext.setAttrsId( newAttrIds );
}
}
ServerEntry result = nextInterceptor.lookup( lookupContext );
ensureNotDeleted( lookupContext.getDn(), result );
return result;
}
@Override
public NamingEnumeration<ServerSearchResult> list( NextInterceptor nextInterceptor, ListOperationContext opContext ) throws NamingException
{
NamingEnumeration<ServerSearchResult> result = nextInterceptor.search(
new SearchOperationContext(
registries, opContext.getDn(), opContext.getAliasDerefMode(),
new PresenceNode( SchemaConstants.OBJECT_CLASS_AT_OID ),
new SearchControls() ) );
return new SearchResultFilteringEnumeration( result, new SearchControls(), InvocationStack.getInstance().peek(),
Constants.DELETED_ENTRIES_FILTER, "List replication filter" );
}
@Override
public NamingEnumeration<ServerSearchResult> search( NextInterceptor nextInterceptor, SearchOperationContext opContext ) throws NamingException
{
SearchControls searchControls = opContext.getSearchControls();
if ( searchControls.getReturningAttributes() != null )
{
String[] oldAttrIds = searchControls.getReturningAttributes();
String[] newAttrIds = new String[oldAttrIds.length + 1];
System.arraycopy( oldAttrIds, 0, newAttrIds, 0, oldAttrIds.length );
newAttrIds[oldAttrIds.length] = Constants.ENTRY_DELETED.toLowerCase();
searchControls.setReturningAttributes( newAttrIds );
}
NamingEnumeration<ServerSearchResult> result = nextInterceptor.search(
new SearchOperationContext( registries, opContext.getDn(), opContext.getAliasDerefMode(), opContext.getFilter(), searchControls ) );
return new SearchResultFilteringEnumeration( result, searchControls, InvocationStack.getInstance().peek(),
Constants.DELETED_ENTRIES_FILTER, "Search Replication filter" );
}
private void ensureNotDeleted( LdapDN name, ServerEntry entry ) throws NamingException {
if ( isDeleted( entry ) )
{
LdapNameNotFoundException e = new LdapNameNotFoundException( "Deleted entry: " + name.getUpName() );
e.setResolvedName( nexus.getMatchedName( new GetMatchedNameOperationContext( registries, name ) ) );
throw e;
}
}
private boolean isDeleted( ServerEntry entry ) throws NamingException
{
if ( entry == null )
{
return true;
}
return entry.contains( Constants.ENTRY_DELETED, "TRUE" );
}
public DirectoryService getDirectoryService()
{
return directoryService;
}
}