| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.directory.server.replication; |
| |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.directory.api.ldap.codec.api.LdapApiService; |
| import org.apache.directory.api.ldap.codec.api.LdapApiServiceFactory; |
| import org.apache.directory.api.ldap.extras.controls.SyncDoneValue; |
| import org.apache.directory.api.ldap.extras.controls.SyncInfoValue; |
| import org.apache.directory.api.ldap.extras.controls.SyncModifyDnType; |
| import org.apache.directory.api.ldap.extras.controls.SyncRequestValue; |
| import org.apache.directory.api.ldap.extras.controls.SyncStateTypeEnum; |
| import org.apache.directory.api.ldap.extras.controls.SyncStateValue; |
| import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum; |
| import org.apache.directory.api.ldap.extras.controls.syncrepl_impl.SyncInfoValueDecorator; |
| import org.apache.directory.api.ldap.extras.controls.syncrepl_impl.SyncRequestValueDecorator; |
| import org.apache.directory.api.ldap.model.constants.SchemaConstants; |
| import org.apache.directory.api.ldap.model.cursor.EntryCursor; |
| import org.apache.directory.api.ldap.model.entry.Attribute; |
| import org.apache.directory.api.ldap.model.entry.DefaultAttribute; |
| import org.apache.directory.api.ldap.model.entry.DefaultModification; |
| import org.apache.directory.api.ldap.model.entry.Entry; |
| import org.apache.directory.api.ldap.model.entry.Modification; |
| import org.apache.directory.api.ldap.model.entry.ModificationOperation; |
| import org.apache.directory.api.ldap.model.exception.LdapException; |
| import org.apache.directory.api.ldap.model.filter.AndNode; |
| import org.apache.directory.api.ldap.model.filter.EqualityNode; |
| import org.apache.directory.api.ldap.model.filter.ExprNode; |
| import org.apache.directory.api.ldap.model.filter.NotNode; |
| import org.apache.directory.api.ldap.model.filter.OrNode; |
| import org.apache.directory.api.ldap.model.filter.PresenceNode; |
| import org.apache.directory.api.ldap.model.message.IntermediateResponse; |
| import org.apache.directory.api.ldap.model.message.Response; |
| import org.apache.directory.api.ldap.model.message.ResultCodeEnum; |
| import org.apache.directory.api.ldap.model.message.SearchRequest; |
| import org.apache.directory.api.ldap.model.message.SearchRequestImpl; |
| import org.apache.directory.api.ldap.model.message.SearchResultDone; |
| import org.apache.directory.api.ldap.model.message.SearchResultEntry; |
| import org.apache.directory.api.ldap.model.message.SearchResultReference; |
| import org.apache.directory.api.ldap.model.message.SearchScope; |
| import org.apache.directory.api.ldap.model.name.Dn; |
| import org.apache.directory.api.ldap.model.name.Rdn; |
| import org.apache.directory.api.ldap.model.schema.AttributeType; |
| import org.apache.directory.api.ldap.model.schema.AttributeTypeOptions; |
| import org.apache.directory.api.ldap.model.schema.SchemaManager; |
| import org.apache.directory.api.util.Strings; |
| import org.apache.directory.ldap.client.api.ConnectionClosedEventListener; |
| import org.apache.directory.ldap.client.api.LdapNetworkConnection; |
| import org.apache.directory.ldap.client.api.future.SearchFuture; |
| import org.apache.directory.server.core.api.DirectoryService; |
| import org.apache.directory.server.core.api.filtering.EntryFilteringCursor; |
| import org.apache.directory.server.ldap.LdapProtocolUtils; |
| import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig; |
| import org.apache.directory.server.ldap.replication.SyncReplConfiguration; |
| import org.apache.directory.server.ldap.replication.consumer.ReplicationConsumer; |
| import org.apache.directory.server.ldap.replication.consumer.ReplicationStatusEnum; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * |
| * Implementation of syncrepl slave a.k.a consumer. |
| * |
| * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
| */ |
| public class MockSyncReplConsumer implements ConnectionClosedEventListener, ReplicationConsumer |
| { |
| /** the logger */ |
| private static final Logger LOG = LoggerFactory.getLogger( MockSyncReplConsumer.class ); |
| |
| /** A dedicated logger for the consumer */ |
| private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( "CONSUMER_LOG" ); |
| |
| /** The codec */ |
| private LdapApiService ldapCodecService = LdapApiServiceFactory.getSingleton(); |
| |
| /** the syncrepl configuration */ |
| private SyncReplConfiguration config; |
| |
| /** A field used to tell the thread it should stop */ |
| private volatile boolean stop = false; |
| |
| /** A mutex used to make the thread sleeping for a moment */ |
| private final Object mutex = new Object(); |
| |
| /** the sync cookie sent by the server */ |
| private byte[] syncCookie; |
| |
| /** connection to the syncrepl provider */ |
| private LdapNetworkConnection connection; |
| |
| /** the search request with control */ |
| private SearchRequest searchRequest; |
| |
| /** the schema manager */ |
| private SchemaManager schemaManager; |
| |
| /** the cookie file */ |
| private File cookieFile; |
| |
| /** flag to indicate whether the consumer was disconnected */ |
| private boolean disconnected; |
| |
| /** The number of added entries */ |
| private AtomicInteger nbAdded = new AtomicInteger( 0 ); |
| |
| private File cookieDir; |
| |
| public static String COOKIES_DIR_NAME = "cookies"; |
| |
| /** attributes on which modification should be ignored */ |
| private static final String[] MOD_IGNORE_AT = new String[] |
| { |
| SchemaConstants.ENTRY_UUID_AT, |
| SchemaConstants.ENTRY_CSN_AT, |
| SchemaConstants.MODIFIERS_NAME_AT, |
| SchemaConstants.MODIFY_TIMESTAMP_AT, |
| SchemaConstants.CREATE_TIMESTAMP_AT, |
| SchemaConstants.CREATORS_NAME_AT, |
| SchemaConstants.ENTRY_PARENT_ID_AT |
| }; |
| |
| /** A thread used to refresh in refreshOnly mode */ |
| private RefresherThread refreshThread; |
| |
| /** the cookie that was saved last time */ |
| private byte[] lastSavedCookie; |
| |
| /** The (entrtyUuid=*) filter */ |
| private static final PresenceNode ENTRY_UUID_PRESENCE_FILTER = new PresenceNode( SchemaConstants.ENTRY_UUID_AT ); |
| |
| /** The set used for search attributes, containing only the entryUuid AT */ |
| private static final Set<AttributeTypeOptions> ENTRY_UUID_ATOP_SET = new HashSet<AttributeTypeOptions>(); |
| |
| private List<Modification> cookieModLst; |
| |
| /** AttributeTypes used for replication */ |
| private static AttributeType COOKIE_AT_TYPE; |
| private static AttributeType ENTRY_UUID_AT; |
| |
| |
| /** |
| * @return the config |
| */ |
| public SyncReplConfiguration getConfig() |
| { |
| return config; |
| } |
| |
| |
| /** |
| * Init the replication service |
| * @param directoryservice The directory service |
| */ |
| public void init( DirectoryService directoryService ) throws Exception |
| { |
| this.schemaManager = directoryService.getSchemaManager(); |
| ENTRY_UUID_AT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ENTRY_UUID_AT ); |
| COOKIE_AT_TYPE = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE ); |
| |
| ENTRY_UUID_ATOP_SET.add( new AttributeTypeOptions( ENTRY_UUID_AT ) ); |
| |
| Attribute cookieAttr = new DefaultAttribute( COOKIE_AT_TYPE ); |
| |
| Modification cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr ); |
| cookieModLst = new ArrayList<Modification>( 1 ); |
| cookieModLst.add( cookieMod ); |
| |
| cookieDir = new File( System.getProperty( "java.io.tmpdir" ) + "/" + COOKIES_DIR_NAME ); |
| cookieDir.mkdirs(); |
| |
| prepareSyncSearchRequest(); |
| } |
| |
| |
| /** |
| * Connect to the remote server. Note that a SyncRepl consumer will be connected to only |
| * one remote server |
| * |
| * @return true if the connections have been successful. |
| */ |
| public boolean connect() |
| { |
| try |
| { |
| String providerHost = config.getRemoteHost(); |
| int port = config.getRemotePort(); |
| |
| // Create a connection |
| if ( connection == null ) |
| { |
| connection = new LdapNetworkConnection( providerHost, port ); |
| connection.setTimeOut( -1L ); |
| |
| if ( config.isUseTls() ) |
| { |
| connection.getConfig().setTrustManagers( config.getTrustManager() ); |
| connection.startTls(); |
| } |
| |
| connection.addConnectionClosedEventListener( this ); |
| } |
| |
| // Try to connect |
| if ( connection.connect() ) |
| { |
| CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(), config.getProducer() ); |
| |
| // Do a bind |
| try |
| { |
| connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) ); |
| disconnected = false; |
| |
| return true; |
| } |
| catch ( LdapException le ) |
| { |
| CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given bind Dn {}", config.getProducer(), config.getReplUserDn() ); |
| LOG.warn( "Failed to bind to the server with the given bind Dn {}", config.getReplUserDn() ); |
| LOG.warn( "", le ); |
| disconnected = true; |
| } |
| } |
| else |
| { |
| CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(), config.getProducer() ); |
| disconnected = true; |
| |
| return false; |
| } |
| } |
| catch ( Exception e ) |
| { |
| CONSUMER_LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(), e.getMessage() ); |
| LOG.error( "Failed to connect to the server {}, cause : {}", config.getProducer(), e.getMessage() ); |
| disconnected = true; |
| } |
| |
| return false; |
| } |
| |
| |
| /** |
| * |
| * prepares a SearchRequest for syncing DIT content. |
| * |
| */ |
| public void prepareSyncSearchRequest() throws LdapException |
| { |
| String baseDn = config.getBaseDn(); |
| |
| searchRequest = new SearchRequestImpl(); |
| |
| searchRequest.setBase( new Dn( baseDn ) ); |
| searchRequest.setFilter( config.getFilter() ); |
| searchRequest.setSizeLimit( config.getSearchSizeLimit() ); |
| searchRequest.setTimeLimit( config.getSearchTimeout() ); |
| |
| searchRequest.setDerefAliases( config.getAliasDerefMode() ); |
| searchRequest.setScope( config.getSearchScope() ); |
| searchRequest.setTypesOnly( false ); |
| |
| searchRequest.addAttributes( config.getAttributes() ); |
| } |
| |
| |
| public ResultCodeEnum handleSearchDone( SearchResultDone searchDone ) |
| { |
| LOG.debug( "///////////////// handleSearchDone //////////////////" ); |
| |
| SyncDoneValue ctrl = ( SyncDoneValue ) searchDone.getControls().get( SyncDoneValue.OID ); |
| |
| if ( ( ctrl != null ) && ( ctrl.getCookie() != null ) ) |
| { |
| syncCookie = ctrl.getCookie(); |
| LOG.debug( "assigning cookie from sync done value control: " + Strings.utf8ToString( syncCookie ) ); |
| storeCookie(); |
| } |
| |
| LOG.debug( "//////////////// END handleSearchDone//////////////////////" ); |
| |
| return searchDone.getLdapResult().getResultCode(); |
| } |
| |
| |
| public void handleSearchReference( SearchResultReference searchRef ) |
| { |
| // this method won't be called cause the provider will serve the referrals as |
| // normal entry objects due to the usage of ManageDsaITControl in the search request |
| } |
| |
| |
| public void handleSearchResult( SearchResultEntry syncResult ) |
| { |
| LOG.debug( "------------- starting handleSearchResult ------------" ); |
| |
| SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID ); |
| |
| try |
| { |
| Entry remoteEntry = syncResult.getEntry(); |
| |
| if ( syncStateCtrl.getCookie() != null ) |
| { |
| syncCookie = syncStateCtrl.getCookie(); |
| LOG.debug( "assigning the cookie from sync state value control: " |
| + Strings.utf8ToString( syncCookie ) ); |
| } |
| |
| SyncStateTypeEnum state = syncStateCtrl.getSyncStateType(); |
| |
| LOG.debug( "state name {}", state.name() ); |
| |
| // check to avoid conversion of UUID from byte[] to String |
| if ( LOG.isDebugEnabled() ) |
| { |
| LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) ); |
| } |
| |
| switch ( state ) |
| { |
| case ADD: |
| LOG.debug( "adding entry with dn {}, {}", remoteEntry.getDn().getName(), remoteEntry ); |
| nbAdded.getAndIncrement(); |
| break; |
| |
| case MODIFY: |
| LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() ); |
| modify( remoteEntry ); |
| break; |
| |
| case MODDN: |
| String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() ); |
| applyModDnOperation( remoteEntry, entryUuid ); |
| |
| break; |
| |
| case DELETE: |
| LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() ); |
| // incase of a MODDN operation resulting in a branch to be moved out of scope |
| // ApacheDS replication provider sends a single delete event on the Dn of the moved branch |
| // so the branch needs to be recursively deleted here |
| deleteRecursive( remoteEntry.getDn(), null ); |
| break; |
| |
| case PRESENT: |
| LOG.debug( "entry present {}", remoteEntry ); |
| break; |
| } |
| |
| // store the cookie only if the above operation was successful |
| if ( syncStateCtrl.getCookie() != null ) |
| { |
| storeCookie(); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( e.getMessage(), e ); |
| } |
| |
| LOG.debug( "------------- Ending handleSearchResult ------------" ); |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void handleSyncInfo( IntermediateResponse syncInfoResp ) |
| { |
| try |
| { |
| LOG.debug( "............... inside handleSyncInfo ..............." ); |
| |
| byte[] syncInfoBytes = syncInfoResp.getResponseValue(); |
| |
| if ( syncInfoBytes == null ) |
| { |
| return; |
| } |
| |
| SyncInfoValueDecorator decorator = new SyncInfoValueDecorator( ldapCodecService ); |
| SyncInfoValue syncInfoValue = ( SyncInfoValue ) decorator.decode( syncInfoBytes ); |
| |
| byte[] cookie = syncInfoValue.getCookie(); |
| |
| int replicaId = -1; |
| |
| if ( cookie != null ) |
| { |
| LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) ); |
| syncCookie = cookie; |
| |
| String cookieString = Strings.utf8ToString( syncCookie ); |
| replicaId = LdapProtocolUtils.getReplicaId( cookieString ); |
| } |
| |
| LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() ); |
| |
| List<byte[]> uuidList = syncInfoValue.getSyncUUIDs(); |
| |
| // if refreshDeletes set to true then delete all the entries with entryUUID |
| // present in the syncIdSet |
| if ( syncInfoValue.isRefreshDeletes() ) |
| { |
| deleteEntries( uuidList, false ); |
| } |
| else |
| { |
| deleteEntries( uuidList, true ); |
| } |
| |
| LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() ); |
| |
| storeCookie(); |
| } |
| catch ( Exception de ) |
| { |
| LOG.error( "Failed to handle syncinfo message", de ); |
| } |
| |
| LOG.debug( ".................... END handleSyncInfo ..............." ); |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void connectionClosed() |
| { |
| if ( disconnected ) |
| { |
| return; |
| } |
| |
| boolean connected = false; |
| |
| while ( !connected ) |
| { |
| try |
| { |
| Thread.sleep( config.getRefreshInterval() ); |
| } |
| catch ( InterruptedException e ) |
| { |
| LOG.error( "Interrupted while sleeping before trying to reconnect", e ); |
| } |
| |
| LOG.debug( "Trying to reconnect" ); |
| connected = connect(); |
| } |
| |
| startSync(); |
| } |
| |
| |
| /** |
| * starts the synchronization operation |
| */ |
| public ReplicationStatusEnum startSync() |
| { |
| // read the cookie if persisted |
| readCookie(); |
| |
| if ( config.isRefreshNPersist() ) |
| { |
| try |
| { |
| LOG.debug( "==================== Refresh And Persist ==========" ); |
| |
| return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, false ); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to sync with refreshAndPersist mode", e ); |
| return ReplicationStatusEnum.DISCONNECTED; |
| } |
| } |
| else |
| { |
| return doRefreshOnly(); |
| } |
| } |
| |
| private ReplicationStatusEnum doRefreshOnly() |
| { |
| while ( !stop ) |
| { |
| LOG.debug( "==================== Refresh Only ==========" ); |
| |
| try |
| { |
| doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false ); |
| |
| LOG.info( "--------------------- Sleep for a little while ------------------" ); |
| mutex.wait( config.getRefreshInterval() ); |
| LOG.debug( "--------------------- syncing again ------------------" ); |
| |
| } |
| catch ( InterruptedException ie ) |
| { |
| LOG.warn( "refresher thread interrupted" ); |
| return ReplicationStatusEnum.DISCONNECTED; |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to sync with refresh only mode", e ); |
| return ReplicationStatusEnum.DISCONNECTED; |
| } |
| } |
| |
| return ReplicationStatusEnum.STOPPED; |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void setConfig( ReplicationConsumerConfig config ) |
| { |
| this.config = ( SyncReplConfiguration ) config; |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void ping() |
| { |
| boolean connected = !disconnected; |
| |
| if ( disconnected ) |
| { |
| connected = connect(); |
| } |
| |
| if ( connected ) |
| { |
| CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId() ); |
| |
| try |
| { |
| Entry baseDn = connection.lookup( config.getBaseDn(), "1.1" ); |
| |
| if ( baseDn == null ) |
| { |
| // Cannot get the entry : this is bad, but possible |
| CONSUMER_LOG.debug( "Cannot fetch '{}' from provider for consumer {}", config.getBaseDn(), config.getReplicaId() ); |
| } |
| else |
| { |
| CONSUMER_LOG.debug( "Fetched '{}' from provider for consumer {}", config.getBaseDn(), config.getReplicaId() ); |
| } |
| } |
| catch ( LdapException le ) |
| { |
| // Error : we must disconnect |
| disconnect(); |
| } |
| } |
| else |
| { |
| CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId() ); |
| } |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public boolean connect( boolean now ) |
| { |
| boolean connected = false; |
| |
| if ( now ) |
| { |
| connected = connect(); |
| } |
| |
| while ( !connected ) |
| { |
| try |
| { |
| // try to establish a connection for every 5 seconds |
| Thread.sleep( 5000 ); |
| } |
| catch ( InterruptedException e ) |
| { |
| LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider {}", |
| config.getReplicaId(), config.getProducer() ); |
| } |
| |
| connected = connect(); |
| } |
| |
| // TODO : we may have cases were we get here with the connected flag to false. With the above |
| // code, thi sis not possible |
| |
| return connected; |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public void stop() |
| { |
| if ( !disconnected ) |
| { |
| disconnect(); |
| nbAdded.getAndSet( 0 ); |
| } |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| public String getId() |
| { |
| return String.valueOf( getConfig().getReplicaId() ); |
| } |
| |
| |
| /** |
| * performs a search on connection with updated syncRequest control. |
| * |
| * @throws Exception in case of any problems encountered while searching |
| */ |
| private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception |
| { |
| SyncRequestValue syncReq = new SyncRequestValueDecorator( ldapCodecService ); |
| |
| syncReq.setMode( syncType ); |
| syncReq.setReloadHint( reloadHint ); |
| |
| if ( syncCookie != null ) |
| { |
| LOG.debug( "searching with searchRequest, cookie '{}'", Strings.utf8ToString( syncCookie ) ); |
| syncReq.setCookie( syncCookie ); |
| } |
| |
| searchRequest.addControl( syncReq ); |
| |
| // Do the search |
| SearchFuture sf = connection.searchAsync( searchRequest ); |
| |
| Response resp = sf.get(); |
| |
| while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() && !disconnected ) |
| { |
| if ( resp instanceof SearchResultEntry ) |
| { |
| handleSearchResult( ( SearchResultEntry ) resp ); |
| } |
| else if ( resp instanceof SearchResultReference ) |
| { |
| handleSearchReference( ( SearchResultReference ) resp ); |
| } |
| else if ( resp instanceof IntermediateResponse ) |
| { |
| handleSyncInfo( ( IntermediateResponse ) resp ); |
| } |
| |
| resp = sf.get(); |
| } |
| |
| ResultCodeEnum resultCode = handleSearchDone( ( SearchResultDone ) resp ); |
| |
| LOG.debug( "sync operation returned result code {}", resultCode ); |
| |
| if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT ) |
| { |
| // log the error and handle it appropriately |
| LOG.warn( "given replication base Dn {} is not found on provider", config.getBaseDn() ); |
| |
| LOG.warn( "disconnecting the consumer running in refreshAndPersist mode from the provider" ); |
| disconnect(); |
| |
| return ReplicationStatusEnum.DISCONNECTED; |
| } |
| else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED ) |
| { |
| LOG.info( "unable to perform the content synchronization cause E_SYNC_REFRESH_REQUIRED" ); |
| |
| try |
| { |
| deleteRecursive( new Dn( config.getBaseDn() ), null ); |
| } |
| catch ( Exception e ) |
| { |
| LOG |
| .error( |
| "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer", |
| e ); |
| disconnect(); |
| } |
| |
| removeCookie(); |
| |
| return ReplicationStatusEnum.REFRESH_REQUIRED; |
| } |
| else |
| { |
| return ReplicationStatusEnum.DISCONNECTED; |
| } |
| } |
| |
| |
| public void disconnect() |
| { |
| disconnected = true; |
| |
| if ( connection == null ) |
| { |
| return; |
| } |
| |
| if ( connection.isConnected() ) |
| { |
| try |
| { |
| if ( refreshThread != null ) |
| { |
| refreshThread.stopRefreshing(); |
| } |
| |
| connection.unBind(); |
| LOG.info( "Unbound from the server {}", config.getRemoteHost() ); |
| |
| connection.close(); |
| LOG.info( "Connection closed for the server {}", config.getRemoteHost() ); |
| |
| connection = null; |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to close the connection", e ); |
| } |
| finally |
| { |
| // persist the cookie |
| storeCookie(); |
| |
| // reset the cookie |
| syncCookie = null; |
| } |
| } |
| } |
| |
| |
| /** |
| * stores the cookie. |
| */ |
| private void storeCookie() |
| { |
| if ( syncCookie == null ) |
| { |
| return; |
| } |
| |
| if ( lastSavedCookie != null && Arrays.equals( syncCookie, lastSavedCookie ) ) |
| { |
| return; |
| } |
| |
| try |
| { |
| if( cookieFile == null ) |
| { |
| cookieFile = new File( cookieDir, String.valueOf( LdapProtocolUtils.getReplicaId( new String( syncCookie ) ) ) ); |
| } |
| |
| FileOutputStream fout = new FileOutputStream( cookieFile ); |
| fout.write( syncCookie.length ); |
| fout.write( syncCookie ); |
| fout.close(); |
| |
| lastSavedCookie = new byte[syncCookie.length]; |
| System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length ); |
| |
| LOG.debug( "stored the cookie" ); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to store the cookie", e ); |
| } |
| } |
| |
| |
| /** |
| * read the cookie |
| */ |
| private void readCookie() |
| { |
| try |
| { |
| if ( ( cookieFile != null ) && cookieFile.exists() && ( cookieFile.length() > 0 ) ) |
| { |
| FileInputStream fin = new FileInputStream( cookieFile ); |
| syncCookie = new byte[fin.read()]; |
| fin.read( syncCookie ); |
| fin.close(); |
| |
| lastSavedCookie = new byte[syncCookie.length]; |
| System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length ); |
| |
| LOG.debug( "read the cookie from file: " + Strings.utf8ToString( syncCookie ) ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to read the cookie", e ); |
| } |
| } |
| |
| |
| /** |
| * deletes the cookie and resets the syncCookie to null |
| */ |
| public void removeCookie() |
| { |
| if ( cookieFile.exists() && ( cookieFile.length() > 0 ) ) |
| { |
| boolean deleted = cookieFile.delete(); |
| LOG.info( "deleted cookie file {}", deleted ); |
| } |
| |
| LOG.info( "resetting sync cookie" ); |
| |
| syncCookie = null; |
| lastSavedCookie = null; |
| } |
| |
| |
| private void applyModDnOperation( Entry remoteEntry, String entryUuid ) throws Exception |
| { |
| LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry ); |
| |
| // First, compute the MODDN type |
| SyncModifyDnType modDnType = null; |
| |
| try |
| { |
| // Retrieve locally the moved or renamed entry |
| String filter = "(entryUuid=" + entryUuid + ")"; |
| EntryCursor cursor = connection.search( Dn.ROOT_DSE, filter, SearchScope.SUBTREE, |
| SchemaConstants.ALL_ATTRIBUTES_ARRAY ); |
| |
| Entry localEntry = cursor.get(); |
| |
| // Compute the DN, parentDn and Rdn for both entries |
| Dn localDn = localEntry.getDn(); |
| Dn remoteDn = remoteEntry.getDn(); |
| |
| Dn localParentDn = localDn.getParent(); |
| Dn remoteParentDn = remoteDn.getParent(); |
| |
| Rdn localRdn = localDn.getRdn(); |
| Rdn remoteRdn = remoteDn.getRdn(); |
| |
| // If the RDN are equals, it's a MOVE |
| if ( localRdn.equals( remoteRdn ) ) |
| { |
| modDnType = SyncModifyDnType.MOVE; |
| } |
| |
| // If the parentDn are equals, it's a RENAME |
| if ( localParentDn.equals( remoteParentDn ) ) |
| { |
| modDnType = SyncModifyDnType.RENAME; |
| } |
| |
| // Otherwise, it's a MOVE and RENAME |
| if ( modDnType == null ) |
| { |
| modDnType = SyncModifyDnType.MOVE_AND_RENAME; |
| } |
| |
| // Check if the OldRdn has been deleted |
| boolean deleteOldRdn = remoteEntry.contains( localRdn.getNormType(), localRdn.getNormValue() ); |
| |
| switch ( modDnType ) |
| { |
| case MOVE: |
| LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn ); |
| |
| break; |
| |
| case RENAME: |
| LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}", new String[] |
| { localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) } ); |
| |
| break; |
| |
| case MOVE_AND_RENAME: |
| LOG.debug( |
| "moveAndRename on the Dn {} with new newParent Dn {}, new Rdn {} and deleteOldRdn flag set to {}", |
| new String[] |
| { |
| localDn.getName(), |
| remoteParentDn.getName(), |
| remoteRdn.getName(), |
| String.valueOf( deleteOldRdn ) } ); |
| |
| break; |
| } |
| } |
| catch ( Exception e ) |
| { |
| throw e; |
| } |
| } |
| |
| |
| private void modify( Entry remoteEntry ) throws Exception |
| { |
| /* |
| Entry localEntry = session.lookup( remoteEntry.getDn() ); |
| |
| remoteEntry.removeAttributes( MOD_IGNORE_AT ); |
| |
| List<Modification> mods = new ArrayList<Modification>(); |
| Iterator<Attribute> itr = localEntry.iterator(); |
| |
| while ( itr.hasNext() ) |
| { |
| Attribute localAttr = itr.next(); |
| String attrId = localAttr.getId(); |
| Modification mod; |
| Attribute remoteAttr = remoteEntry.get( attrId ); |
| |
| if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time? |
| { |
| mod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr ); |
| remoteEntry.remove( remoteAttr ); |
| } |
| else |
| { |
| mod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr ); |
| } |
| |
| mods.add( mod ); |
| } |
| |
| if ( remoteEntry.size() > 0 ) |
| { |
| itr = remoteEntry.iterator(); |
| while ( itr.hasNext() ) |
| { |
| mods.add( new DefaultModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) ); |
| } |
| } |
| |
| session.modify( remoteEntry.getDn(), mods ); |
| */ |
| } |
| |
| |
| /** |
| * deletes the entries having the UUID given in the list |
| * |
| * @param uuidList the list of UUIDs |
| * @throws Exception in case of any problems while deleting the entries |
| */ |
| public void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent ) throws Exception |
| { |
| if ( uuidList == null || uuidList.isEmpty() ) |
| { |
| return; |
| } |
| |
| for ( byte[] uuid : uuidList ) |
| { |
| LOG.info( "uuid: {}", Strings.uuidToString( uuid ) ); |
| } |
| |
| // if it is refreshPresent list then send all the UUIDs for |
| // filtering, otherwise breaking the list will cause the |
| // other present entries to be deleted from DIT |
| if ( isRefreshPresent ) |
| { |
| LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() ); |
| _deleteEntries_( uuidList, isRefreshPresent ); |
| return; |
| } |
| |
| int NODE_LIMIT = 10; |
| |
| int count = uuidList.size() / NODE_LIMIT; |
| |
| int startIndex = 0; |
| int i = 0; |
| for ( ; i < count; i++ ) |
| { |
| startIndex = i * NODE_LIMIT; |
| _deleteEntries_( uuidList.subList( startIndex, startIndex + NODE_LIMIT ), isRefreshPresent ); |
| } |
| |
| if ( ( uuidList.size() % NODE_LIMIT ) != 0 ) |
| { |
| // remove the remaining entries |
| if ( count > 0 ) |
| { |
| startIndex = i * NODE_LIMIT; |
| } |
| _deleteEntries_( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent ); |
| } |
| } |
| |
| |
| /** |
| * do not call this method directly, instead call deleteEntries() |
| * |
| * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list) |
| * @param isRefreshPresent a flag indicating the type of entries present in the UUID list |
| */ |
| private void _deleteEntries_( List<byte[]> limitedUuidList, boolean isRefreshPresent ) throws Exception |
| { |
| ExprNode filter = null; |
| int size = limitedUuidList.size(); |
| if ( size == 1 ) |
| { |
| String uuid = Strings.uuidToString( limitedUuidList.get( 0 ) ); |
| filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, |
| new org.apache.directory.api.ldap.model.entry.StringValue( uuid ) ); |
| if ( isRefreshPresent ) |
| { |
| filter = new NotNode( filter ); |
| } |
| } |
| else |
| { |
| if ( isRefreshPresent ) |
| { |
| filter = new AndNode(); |
| } |
| else |
| { |
| filter = new OrNode(); |
| } |
| |
| for ( int i = 0; i < size; i++ ) |
| { |
| String uuid = Strings.uuidToString( limitedUuidList.get( i ) ); |
| ExprNode uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, |
| new org.apache.directory.api.ldap.model.entry.StringValue( uuid ) ); |
| |
| if ( isRefreshPresent ) |
| { |
| uuidEqNode = new NotNode( uuidEqNode ); |
| ( ( AndNode ) filter ).addNode( uuidEqNode ); |
| } |
| else |
| { |
| ( ( OrNode ) filter ).addNode( uuidEqNode ); |
| } |
| } |
| } |
| |
| Dn dn = new Dn( schemaManager, config.getBaseDn() ); |
| |
| /* |
| LOG.debug( "selecting entries to be deleted using filter {}", filter.toString() ); |
| EntryFilteringCursor cursor = session.search( dn, SearchScope.SUBTREE, filter, |
| AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET ); |
| cursor.beforeFirst(); |
| |
| while ( cursor.next() ) |
| { |
| Entry entry = cursor.get(); |
| deleteRecursive( entry.getDn(), null ); |
| } |
| |
| cursor.close(); |
| */ |
| } |
| |
| /** |
| * A Thread implementation for synchronizing the DIT in refreshOnly mode |
| */ |
| private class RefresherThread extends Thread |
| { |
| |
| public RefresherThread() |
| { |
| setDaemon( true ); |
| } |
| |
| |
| public void run() |
| { |
| while ( !stop ) |
| { |
| LOG.debug( "==================== Refresh Only ==========" ); |
| |
| try |
| { |
| doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, false ); |
| |
| LOG.info( "--------------------- Sleep for a little while ------------------" ); |
| mutex.wait( config.getRefreshInterval() ); |
| LOG.debug( "--------------------- syncing again ------------------" ); |
| |
| } |
| catch ( InterruptedException ie ) |
| { |
| LOG.warn( "refresher thread interrupted" ); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to sync with refresh only mode", e ); |
| } |
| } |
| } |
| |
| |
| public void stopRefreshing() |
| { |
| stop = true; |
| |
| // just in case if it is sleeping, wake up the thread |
| mutex.notify(); |
| } |
| } |
| |
| |
| /** |
| * removes all child entries present under the given Dn and finally the Dn itself |
| * |
| * Working: |
| * This is a recursive function which maintains a Map<Dn,Cursor>. |
| * The way the cascade delete works is by checking for children for a |
| * given Dn(i.e opening a search cursor) and if the cursor is empty |
| * then delete the Dn else for each entry's Dn present in cursor call |
| * deleteChildren() with the Dn and the reference to the map. |
| * |
| * The reason for opening a search cursor is based on an assumption |
| * that an entry *might* contain children, consider the below DIT fragment |
| * |
| * parent |
| * / \ |
| * child1 child2 |
| * / \ |
| * grand21 grand22 |
| * |
| * The below method works better in the case where the tree depth is >1 |
| * |
| * In the case of passing a non-null DeleteListener, the return value will always be null, cause the |
| * operation is treated as asynchronous and response result will be sent using the listener callback |
| * |
| * @param rootDn the Dn which will be removed after removing its children |
| * @param map a map to hold the Cursor related to a Dn |
| * @throws Exception If the Dn is not valid or if the deletion failed |
| */ |
| private void deleteRecursive( Dn rootDn, Map<Dn, EntryFilteringCursor> cursorMap ) throws Exception |
| { |
| LOG.debug( "searching for {}", rootDn.getName() ); |
| EntryFilteringCursor cursor = null; |
| |
| try |
| { |
| if ( cursorMap == null ) |
| { |
| cursorMap = new HashMap<Dn, EntryFilteringCursor>(); |
| } |
| |
| /* |
| cursor = cursorMap.get( rootDn ); |
| |
| if ( cursor == null ) |
| { |
| cursor = session.search( rootDn, SearchScope.ONELEVEL, ENTRY_UUID_PRESENCE_FILTER, |
| AliasDerefMode.NEVER_DEREF_ALIASES, ENTRY_UUID_ATOP_SET ); |
| cursor.beforeFirst(); |
| LOG.debug( "putting cursor for {}", rootDn.getName() ); |
| cursorMap.put( rootDn, cursor ); |
| } |
| |
| if ( !cursor.next() ) // if this is a leaf entry's Dn |
| { |
| LOG.debug( "deleting {}", rootDn.getName() ); |
| cursorMap.remove( rootDn ); |
| cursor.close(); |
| session.delete( rootDn ); |
| } |
| else |
| { |
| do |
| { |
| Entry entry = cursor.get(); |
| |
| deleteRecursive( entry.getDn(), cursorMap ); |
| } |
| while ( cursor.next() ); |
| |
| cursorMap.remove( rootDn ); |
| cursor.close(); |
| LOG.debug( "deleting {}", rootDn.getName() ); |
| session.delete( rootDn ); |
| } |
| */ |
| } |
| catch ( Exception e ) |
| { |
| String msg = "Failed to delete child entries under the Dn " + rootDn.getName(); |
| LOG.error( msg, e ); |
| throw e; |
| } |
| } |
| |
| |
| /** |
| * @return the nbAdded |
| */ |
| public int getNbAdded() |
| { |
| return nbAdded.get(); |
| } |
| |
| |
| /** |
| * @return the nbAdded |
| */ |
| public void resetNbAdded() |
| { |
| nbAdded.getAndSet( 0 ); |
| } |
| } |