blob: a42db88f49466ced9f174d28116bd0d99aa6a572 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.directory.server.syncrepl;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.naming.ldap.Control;
import org.apache.directory.server.core.CoreSession;
import org.apache.directory.server.core.DirectoryService;
import org.apache.directory.server.core.entry.DefaultServerEntry;
import org.apache.directory.server.core.entry.ServerModification;
import org.apache.directory.shared.asn1.codec.DecoderException;
import org.apache.directory.shared.ldap.client.api.LdapConnection;
import org.apache.directory.shared.ldap.client.api.exception.LdapException;
import org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener;
import org.apache.directory.shared.ldap.client.api.listeners.SearchListener;
import org.apache.directory.shared.ldap.client.api.messages.BindResponse;
import org.apache.directory.shared.ldap.client.api.messages.IntermediateResponse;
import org.apache.directory.shared.ldap.client.api.messages.LdapResult;
import org.apache.directory.shared.ldap.client.api.messages.SearchRequest;
import org.apache.directory.shared.ldap.client.api.messages.SearchRequestImpl;
import org.apache.directory.shared.ldap.client.api.messages.SearchResultDone;
import org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry;
import org.apache.directory.shared.ldap.client.api.messages.SearchResultReference;
import org.apache.directory.shared.ldap.codec.controls.replication.syncDoneValue.SyncDoneValueControlCodec;
import org.apache.directory.shared.ldap.codec.controls.replication.syncDoneValue.SyncDoneValueControlDecoder;
import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlCodec;
import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueControlDecoder;
import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlCodec;
import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueControlDecoder;
import org.apache.directory.shared.ldap.constants.SchemaConstants;
import org.apache.directory.shared.ldap.entry.Entry;
import org.apache.directory.shared.ldap.entry.EntryAttribute;
import org.apache.directory.shared.ldap.entry.Modification;
import org.apache.directory.shared.ldap.entry.ModificationOperation;
import org.apache.directory.shared.ldap.filter.SearchScope;
import org.apache.directory.shared.ldap.message.AliasDerefMode;
import org.apache.directory.shared.ldap.message.ResultCodeEnum;
import org.apache.directory.shared.ldap.message.control.replication.SyncDoneValueControl;
import org.apache.directory.shared.ldap.message.control.replication.SyncRequestValueControl;
import org.apache.directory.shared.ldap.message.control.replication.SyncStateTypeEnum;
import org.apache.directory.shared.ldap.message.control.replication.SyncStateValueControl;
import org.apache.directory.shared.ldap.message.control.replication.SynchronizationModeEnum;
import org.apache.directory.shared.ldap.util.StringTools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* An agent capable of communicate with some LDAP servers.
*
* TODO write test cases
*
* @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
* @version $Rev$, $Date$
*/
public class SyncReplConsumer implements SearchListener, IntermediateResponseListener
{
/** the syncrepl configuration */
private SyncreplConfiguration config;
/** the sync cookie sent by the server */
private byte[] syncCookie;
/** the logger */
private static final Logger LOG = LoggerFactory.getLogger( SyncReplConsumer.class );
/** conection to the syncrepl provider */
private LdapConnection connection;
/** the search request with control */
private SearchRequest searchRequest;
/** the syncrequest control */
private SyncRequestValueControl syncReq;
/** a reference to the directoryService */
private DirectoryService directoryService;
/** the decoder for syncinfovalue control */
private SyncInfoValueControlDecoder decoder = new SyncInfoValueControlDecoder();
/** the cookie file */
private File cookieFile;
/** flag to indicate whether the consumer was diconncted */
private boolean disconnected;
/** the core session */
private CoreSession session;
private SyncDoneValueControlDecoder syncDoneControlDecoder = new SyncDoneValueControlDecoder();
private SyncStateValueControlDecoder syncStateControlDecoder = new SyncStateValueControlDecoder();
/**
* @return the config
*/
public SyncreplConfiguration getConfig()
{
return config;
}
/**
* @param config the config to set
*/
public void setConfig( SyncreplConfiguration config )
{
this.config = config;
}
/**
* A helper method to quickly quit the program
*/
private static void quit( LdapConnection connection ) throws IOException
{
connection.close();
System.exit( 1 );
}
public void init( DirectoryService directoryservice ) throws Exception
{
this.directoryService = directoryservice;
File cookieDir = new File( directoryservice.getWorkingDirectory(), "cookies" );
cookieDir.mkdir();
cookieFile = new File( cookieDir, String.valueOf( config.getReplicaId() ) );
session = directoryService.getAdminSession();
}
public boolean bind()
{
try
{
String providerHost = config.getProviderHost();
int port = config.getPort();
// Create a connection
if( connection == null )
{
connection = new LdapConnection( providerHost, port );
}
// Do a bind
BindResponse bindResponse = connection.bind( config.getBindDn(), config.getCredentials() );
// Check that it' not null and valid
if ( bindResponse == null )
{
LOG.error( "Failed to bind with the given bindDN and credentials", bindResponse );
return false;
}
// Now get the result
LdapResult ldapResult = bindResponse.getLdapResult();
if ( ldapResult.getResultCode() != ResultCodeEnum.SUCCESS )
{
LOG.warn( "Failed to bind on the server : {}", ldapResult );
}
else
{
return true;
}
}
catch ( Exception e )
{
LOG.error( "Failed to bind with the given bindDN and credentials", e );
}
return false;
}
/**
*
* prepares a SearchRequest for syncing DIT content.
*
*/
public void prepareSyncSearchRequest()
{
String baseDn = config.getBaseDn();
searchRequest = new SearchRequestImpl();
searchRequest.setBaseDn( baseDn );
searchRequest.setFilter( config.getFilter() );
searchRequest.setSizeLimit( config.getSearchSizeLimit() );
searchRequest.setTimeLimit( config.getSearchTimeout() );
// the only valid values are NEVER_DEREF_ALIASES and DEREF_FINDING_BASE_OBJ
searchRequest.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
searchRequest.setScope( SearchScope.getSearchScope( config.getSearchScope() ) );
searchRequest.setTypesOnly( false );
String attributes = config.getAttributes();
if ( StringTools.isEmpty( attributes ) )
{
searchRequest.addAttributes( SchemaConstants.ALL_USER_ATTRIBUTES );
}
else
{
String[] attrs = attributes.trim().split( "," );
searchRequest.addAttributes( attrs );
}
syncReq = new SyncRequestValueControl();
if ( config.isRefreshPersist() )
{
syncReq.setMode( SynchronizationModeEnum.REFRESH_AND_PERSIST );
}
else
{
syncReq.setMode( SynchronizationModeEnum.REFRESH_ONLY );
}
syncReq.setReloadHint( false );
Control control = new SyncRequestValueControl();
((SyncRequestValueControl)control).setCookie( syncReq.getEncodedValue() );
try
{
searchRequest.add( control );
}
catch( LdapException e )
{
// shouldn't happen
LOG.error( "Failed to add constrol to the search request", e );
}
}
public void handleSearchDone( SearchResultDone searchDone )
{
LOG.debug( "///////////////// handleSearchDone //////////////////" );
Control ctrl = searchDone.getControl( SyncDoneValueControl.CONTROL_OID );
SyncDoneValueControlCodec syncDoneCtrl = null;
try
{
syncDoneCtrl = ( SyncDoneValueControlCodec ) syncDoneControlDecoder.decode( ctrl.getEncodedValue() );
}
catch( Exception e )
{
LOG.error( "Failed to decode the syncDoneControlCodec", e );
}
if ( syncDoneCtrl.getCookie() != null )
{
syncCookie = syncDoneCtrl.getCookie();
LOG.debug( "assigning cookie from sync done value control: " + StringTools.utf8ToString( syncCookie ) );
}
else
{
LOG.info( "cookie in syncdone message is null" );
}
if ( !config.isRefreshPersist() )
{
// Now, switch to refreshAndPresist
config.setRefreshPersist( true );
LOG.debug( "Swithing to RefreshAndPersist" );
try
{
// the below call is required to send the updated cookie
// and refresh mode change (i.e to refreshAndPersist stage)
// cause while the startSync() method sleeps even after the 'sync done'
// message arrives as part of initial searchRequest with 'refreshOnly' mode.
// During this sleep time any 'modifications' happened on the server
// to already fetched entries will be sent as SearchResultEntries with
// SyncState value as 'ADD' which will conflict with the DNs of initially received entries
// TODO thinking of alternative ways to implement this in case of large DITs
// where the modification to entry(ies) can happen before the initial sync is done
doSyncSearch();
}
catch( Exception e )
{
LOG.error( "Failed to send a search request with RefreshAndPersist mode", e );
}
}
LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
}
public void handleSearchReference( SearchResultReference searchRef )
{
LOG.error( "!!!!!!!!!!!!!!!!! TODO handle SearchReference messages !!!!!!!!!!!!!!!!" );
}
public void handleSearchResult( SearchResultEntry syncResult )
{
LOG.debug( "------------- starting handleSearchResult ------------" );
try
{
Entry remoteEntry = syncResult.getEntry();
Control ctrl = syncResult.getControl( SyncStateValueControl.CONTROL_OID );
SyncStateValueControlCodec syncStateCtrl = null;
try
{
syncStateCtrl = ( SyncStateValueControlCodec ) syncStateControlDecoder.decode( ctrl.getEncodedValue() );
}
catch( Exception e )
{
LOG.error( "Failed to decode syncStateControl", e );
}
if ( syncStateCtrl.getCookie() != null )
{
syncCookie = syncStateCtrl.getCookie();
LOG.debug( "assigning the cookie from sync state value control: "
+ StringTools.utf8ToString( syncCookie ) );
}
SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
LOG.debug( "state name {}", state.name() );
LOG.debug( "entryUUID = {}", UUID.nameUUIDFromBytes( syncStateCtrl.getEntryUUID() ) );
switch ( state )
{
case ADD :
if ( !session.exists( remoteEntry.getDn() ) )
{
LOG.debug( "adding entry with dn {}", remoteEntry.getDn().getUpName() );
LOG.debug( remoteEntry.toString() );
session.add( new DefaultServerEntry( directoryService.getRegistries(), remoteEntry ) );
}
break;
case MODIFY :
modify( remoteEntry );
break;
case DELETE :
LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getUpName() );
session.delete( remoteEntry.getDn() );
break;
}
}
catch ( Exception e )
{
LOG.error( e.getMessage(), e );
}
LOG.debug( "------------- Ending handleSearchResult ------------" );
}
/**
* {@inheritDoc}
* atm does nothinng except examinig and printing the content of syncinfovalue control
*/
public void handleSyncInfo( byte[] syncinfo )
{
try
{
LOG.debug( "............... inside handleSyncInfo ..............." );
SyncInfoValueControlCodec syncInfoValue = ( SyncInfoValueControlCodec ) decoder.decode( syncinfo );
byte[] cookie = syncInfoValue.getCookie();
if ( cookie != null )
{
LOG.debug( "setting the cookie from the sync info: " + StringTools.utf8ToString( cookie ) );
syncCookie = cookie;
}
List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() );
if( uuidList != null )
{
for( byte[] uuid : uuidList )
{
LOG.info( "uuid: {}", StringTools.utf8ToString( uuid ) );
}
}
// if refreshDeletes set to true then delete all the entries with entryUUID
// present in the syncIdSet
if( syncInfoValue.isRefreshDeletes() && ( uuidList != null ) )
{
for( byte[] uuid : uuidList )
{
// TODO similar to delete based on DN there should be
// a method to delete an Entry based on entryUUID
LOG.debug( "FIXME deleting the entry with entryUUID: {}", UUID.nameUUIDFromBytes( uuid ) );
}
}
LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() );
}
catch ( DecoderException de )
{
LOG.error( "Failed to handle syncinfo message" );
de.printStackTrace();
}
LOG.debug( ".................... END handleSyncInfo ..............." );
}
/**
* {@inheritDoc}
*/
public void handleSessionClosed()
{
if( disconnected )
{
return;
}
boolean connected = false;
while( !connected )
{
try
{
Thread.sleep( config.getConsumerInterval() );
}
catch( InterruptedException e )
{
LOG.error( "Interrupted while sleeping before trying to reconnect", e );
}
LOG.debug( "Trying to reconnect" );
connected = bind();
}
startSync();
}
/**
* starts the syn operation
*/
public void startSync()
{
// read the cookie if persisted
readCookie();
if( config.isRefreshPersist() )
{
try
{
LOG.debug( "==================== Refresh And Persist ==========" );
doSyncSearch();
}
catch( Exception e )
{
LOG.error( "Failed to sync with refreshAndPersist mode", e );
}
return;
}
// continue till refreshAndPersist mode is not set
while( !config.isRefreshPersist() )
{
LOG.debug( "==================== Refresh Only ==========" );
try
{
if ( ( syncReq.getCookie() == null ) || ( syncReq.getCookie().length == 0 ) )
{
LOG.debug( "First search (no cookie)" );
}
else
{
LOG.debug( "searching with searchRequest, cookie '{}'", StringTools.utf8ToString( syncReq.getCookie() ) );
}
doSyncSearch();
LOG.info( "--------------------- Sleep for a little while ------------------" );
Thread.sleep( config.getConsumerInterval() );
LOG.debug( "--------------------- syncing again ------------------" );
}
catch ( Exception e )
{
LOG.error( "Failed to sync with refresh only mode", e );
}
}
LOG.debug( "**************** Done with the initial sync ***************" );
}
//====================== SearchListener methods ====================================
/* (non-Javadoc)
* @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#entryFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultEntry)
*/
public void entryFound( LdapConnection connection, SearchResultEntry searchResultEntry ) throws LdapException
{
handleSearchResult( searchResultEntry );
}
/* (non-Javadoc)
* @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#referralFound(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultReference)
*/
public void referralFound( LdapConnection connection, SearchResultReference searchResultReference )
throws LdapException
{
handleSearchReference( searchResultReference );
}
/* (non-Javadoc)
* @see org.apache.directory.shared.ldap.client.api.listeners.SearchListener#searchDone(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.SearchResultDone)
*/
public void searchDone( LdapConnection connection, SearchResultDone searchResultDone ) throws LdapException
{
handleSearchDone( searchResultDone );
}
/* (non-Javadoc)
* @see org.apache.directory.shared.ldap.client.api.listeners.IntermediateResponseListener#responseReceived(org.apache.directory.shared.ldap.client.api.LdapConnection, org.apache.directory.shared.ldap.client.api.messages.IntermediateResponse)
*/
public void responseReceived( LdapConnection connection, IntermediateResponse intermediateResponse )
{
handleSyncInfo( intermediateResponse.getResponseValue() );
}
/**
* performs a search on connection with updated syncRequest control.
*
* @throws Exception in case of any problems encountered while searching
*/
private void doSyncSearch() throws Exception
{
SyncRequestValueControl syncReq = new SyncRequestValueControl();
if( config.isRefreshPersist() )
{
syncReq.setMode( SynchronizationModeEnum.REFRESH_AND_PERSIST );
}
if ( syncCookie != null )
{
syncReq.setCookie( syncCookie );
}
SyncRequestValueControl syncReqControl = ( SyncRequestValueControl ) searchRequest.getControl( SyncRequestValueControl.CONTROL_OID );
searchRequest.remove( syncReqControl );
searchRequest.add( syncReq );
// Do the search
connection.search( searchRequest, this );
}
public void disconnet()
{
disconnected = true;
try
{
connection.unBind();
LOG.info( "Unbound from the server {}", config.getProviderHost() );
connection.close();
LOG.info( "Connection closed for the server {}", config.getProviderHost() );
connection = null;
// persist the cookie
storeCookie();
}
catch ( Exception e )
{
LOG.error( "Failed to close the connection", e );
}
}
/**
* stores the cookie in a file.
*/
private void storeCookie()
{
if( syncCookie == null )
{
return;
}
try
{
FileOutputStream fout = new FileOutputStream( cookieFile );
fout.write( syncCookie.length );
fout.write( syncCookie );
fout.close();
LOG.debug( "stored the cookie" );
}
catch( Exception e )
{
LOG.error( "Failed to store the cookie", e );
}
}
/**
* read the cookie from a file(if exists).
*/
private void readCookie()
{
try
{
if( cookieFile.exists() && ( cookieFile.length() > 0 ) )
{
FileInputStream fin = new FileInputStream( cookieFile );
syncCookie = new byte[ fin.read() ];
fin.read( syncCookie );
fin.close();
LOG.debug( "read the cookie from file: " + StringTools.utf8ToString( syncCookie ) );
}
}
catch( Exception e )
{
LOG.error( "Failed to read the cookie", e );
}
}
/**
* deletes the cookie file(if exists)
*/
public void deleteCookieFile()
{
if( cookieFile != null && cookieFile.exists() )
{
LOG.debug( "deleting the cookie file" );
cookieFile.delete();
}
}
private void modify( Entry remoteEntry ) throws Exception
{
LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getUpName() );
Entry localEntry = session.lookup( remoteEntry.getDn() );
List<Modification> mods = new ArrayList<Modification>();
Iterator<EntryAttribute> itr = localEntry.iterator();
while ( itr.hasNext() )
{
EntryAttribute localAttr = itr.next();
String attrId = localAttr.getId();
Modification mod;
EntryAttribute remoteAttr = remoteEntry.get( attrId );
if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
{
mod = new ServerModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
remoteEntry.remove( remoteAttr );
}
else
{
mod = new ServerModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
}
mods.add( mod );
}
if( remoteEntry.size() > 0 )
{
itr = remoteEntry.iterator();
while( itr.hasNext() )
{
mods.add( new ServerModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
}
}
session.modify( remoteEntry.getDn(), mods );
}
}