| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.directory.server.ldap.replication; |
| |
| |
| import org.apache.directory.server.core.event.DirectoryListener; |
| import org.apache.directory.server.core.event.EventType; |
| import org.apache.directory.server.core.interceptor.context.AddOperationContext; |
| import org.apache.directory.server.core.interceptor.context.DeleteOperationContext; |
| import org.apache.directory.server.core.interceptor.context.ModifyOperationContext; |
| import org.apache.directory.server.core.interceptor.context.MoveAndRenameOperationContext; |
| import org.apache.directory.server.core.interceptor.context.MoveOperationContext; |
| import org.apache.directory.server.core.interceptor.context.RenameOperationContext; |
| import org.apache.directory.server.i18n.I18n; |
| import org.apache.directory.server.ldap.LdapSession; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueDecorator; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncmodifydn.SyncModifyDnDecorator; |
| import org.apache.directory.shared.ldap.message.control.replication.SyncModifyDnType; |
| import org.apache.directory.shared.ldap.message.control.replication.SyncStateTypeEnum; |
| import org.apache.directory.shared.ldap.model.constants.SchemaConstants; |
| import org.apache.directory.shared.ldap.model.entry.Entry; |
| import org.apache.directory.shared.ldap.model.exception.LdapInvalidAttributeValueException; |
| import org.apache.directory.shared.ldap.model.message.AbandonListener; |
| import org.apache.directory.shared.ldap.model.message.AbandonableRequest; |
| import org.apache.directory.shared.ldap.model.message.SearchRequest; |
| import org.apache.directory.shared.ldap.model.message.SearchResultEntry; |
| import org.apache.directory.shared.ldap.model.message.SearchResultEntryImpl; |
| import org.apache.directory.shared.util.Strings; |
| import org.apache.mina.core.future.WriteFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * |
| * modeled after PersistentSearchListener |
| * NOTE: doco is missing at many parts. Will be added once the functionality is satisfactory |
| * |
| * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
| */ |
| public class SyncReplSearchListener implements DirectoryListener, AbandonListener |
| { |
| private static final Logger LOG = LoggerFactory.getLogger( SyncReplSearchListener.class ); |
| |
| private LdapSession session; |
| private SearchRequest req; |
| |
| private volatile boolean pushInRealTime; |
| |
| private final ReplicaEventLog clientMsgLog; |
| |
| |
| SyncReplSearchListener( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog, |
| boolean pushInRealTime ) |
| { |
| this.pushInRealTime = pushInRealTime; |
| setSession( session ); |
| setReq( req ); |
| |
| this.clientMsgLog = clientMsgLog; |
| } |
| |
| |
| public void setSession( LdapSession session ) |
| { |
| this.session = session; |
| } |
| |
| |
| public void setReq( SearchRequest req ) |
| { |
| this.req = req; |
| if ( req != null ) |
| { |
| req.addAbandonListener( this ); |
| } |
| } |
| |
| |
| public void abandon() throws Exception |
| { |
| if ( session != null ) |
| { |
| // must abandon the operation |
| session.getCoreSession().getDirectoryService().getEventService().removeListener( this ); |
| } |
| |
| /* |
| * From RFC 2251 Section 4.11: |
| * |
| * In the event that a server receives an Abandon Request on a Search |
| * operation in the midst of transmitting responses to the Search, that |
| * server MUST cease transmitting entry responses to the abandoned |
| * request immediately, and MUST NOT send the SearchResultDone. Of |
| * course, the server MUST ensure that only properly encoded LDAPMessage |
| * PDUs are transmitted. |
| * |
| * SO DON'T SEND BACK ANYTHING!!!!! |
| */ |
| } |
| |
| |
| public void requestAbandoned( AbandonableRequest req ) |
| { |
| try |
| { |
| abandon(); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( I18n.err( I18n.ERR_164 ), e ); |
| } |
| } |
| |
| |
| public void entryAdded( AddOperationContext addContext ) |
| { |
| Entry entry = addContext.getEntry(); |
| |
| LOG.debug( "sending added entry {}", entry.getDn() ); |
| |
| try |
| { |
| if ( pushInRealTime ) |
| { |
| |
| SearchResultEntry respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setObjectName( entry.getDn() ); |
| respEntry.setEntry( entry ); |
| |
| SyncStateValueDecorator syncAdd = new SyncStateValueDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| syncAdd.setSyncStateType( SyncStateTypeEnum.ADD ); |
| syncAdd |
| .setEntryUUID( Strings.uuidToBytes(entry.get(SchemaConstants.ENTRY_UUID_AT).getString()) ); |
| syncAdd.setCookie( getCookie( entry ) ); |
| respEntry.addControl( syncAdd ); |
| |
| WriteFuture future = session.getIoSession().write( respEntry ); |
| handleWriteFuture( future, entry, EventType.ADD, null ); |
| } |
| else |
| { |
| clientMsgLog.log( EventType.ADD, addContext.getEntry() ); |
| } |
| } |
| catch ( LdapInvalidAttributeValueException e ) |
| { |
| // shouldn't happen |
| LOG.error( e.getMessage(), e ); |
| } |
| } |
| |
| |
| public void entryDeleted( DeleteOperationContext deleteContext ) |
| { |
| sendDeletedEntry( deleteContext.getEntry() ); |
| } |
| |
| |
| private void sendDeletedEntry( Entry entry ) |
| { |
| LOG.debug( "sending deleted entry {}", entry.getDn() ); |
| |
| try |
| { |
| if ( pushInRealTime ) |
| { |
| SearchResultEntry respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setObjectName( entry.getDn() ); |
| respEntry.setEntry( entry ); |
| |
| SyncStateValueDecorator syncDelete = new SyncStateValueDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| syncDelete.setSyncStateType( SyncStateTypeEnum.DELETE ); |
| syncDelete.setEntryUUID( Strings.uuidToBytes(entry.get(SchemaConstants.ENTRY_UUID_AT) |
| .getString()) ); |
| syncDelete.setCookie( getCookie( entry ) ); |
| respEntry.addControl( syncDelete ); |
| |
| WriteFuture future = session.getIoSession().write( respEntry ); |
| |
| handleWriteFuture( future, entry, EventType.DELETE, null ); |
| } |
| else |
| { |
| clientMsgLog.log( EventType.DELETE, entry ); |
| } |
| } |
| catch ( LdapInvalidAttributeValueException e ) |
| { |
| // shouldn't happen |
| LOG.error( e.getMessage(), e ); |
| } |
| } |
| |
| |
| public void entryModified( ModifyOperationContext modifyContext ) |
| { |
| Entry alteredEntry = modifyContext.getAlteredEntry(); |
| |
| LOG.debug( "sending modified entry {}", alteredEntry.getDn() ); |
| |
| try |
| { |
| if ( pushInRealTime ) |
| { |
| |
| SearchResultEntry respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setObjectName( modifyContext.getDn() ); |
| respEntry.setEntry( alteredEntry ); |
| |
| SyncStateValueDecorator syncModify = new SyncStateValueDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| syncModify.setSyncStateType( SyncStateTypeEnum.MODIFY ); |
| syncModify.setEntryUUID( Strings.uuidToBytes(alteredEntry.get(SchemaConstants.ENTRY_UUID_AT) |
| .getString()) ); |
| syncModify.setCookie( getCookie( alteredEntry ) ); |
| respEntry.addControl( syncModify ); |
| |
| WriteFuture future = session.getIoSession().write( respEntry ); |
| |
| // store altered entry cause that holds the updated CSN |
| handleWriteFuture( future, alteredEntry, EventType.MODIFY, null ); |
| } |
| else |
| { |
| clientMsgLog.log( EventType.MODIFY, modifyContext.getAlteredEntry() ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( e.getMessage(), e ); |
| } |
| } |
| |
| |
| public void entryMoved( MoveOperationContext moveContext ) |
| { |
| Entry entry = moveContext.getEntry(); |
| |
| LOG.debug( "sending moved entry {}", entry.getDn() ); |
| |
| try |
| { |
| if ( !moveContext.getNewSuperior().isChildOf( clientMsgLog.getSearchCriteria().getBase() ) ) |
| { |
| sendDeletedEntry( moveContext.getEntry() ); |
| return; |
| } |
| |
| SyncModifyDnDecorator modDnControl = new SyncModifyDnDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService(), |
| SyncModifyDnType.MOVE ); |
| modDnControl.setEntryDn( moveContext.getDn().getNormName() ); |
| modDnControl.setNewSuperiorDn( moveContext.getNewSuperior().getNormName() ); |
| |
| if ( pushInRealTime ) |
| { |
| SearchResultEntry respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setObjectName( moveContext.getDn() ); |
| respEntry.setEntry( entry ); |
| |
| SyncStateValueDecorator syncModify = new SyncStateValueDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| syncModify.setSyncStateType( SyncStateTypeEnum.MODDN ); |
| syncModify.setEntryUUID( Strings.uuidToBytes(entry.get(SchemaConstants.ENTRY_UUID_AT) |
| .getString()) ); |
| syncModify.setCookie( getCookie( entry ) ); |
| respEntry.addControl( syncModify ); |
| respEntry.addControl( modDnControl ); |
| |
| WriteFuture future = session.getIoSession().write( respEntry ); |
| |
| handleWriteFuture( future, entry, null, modDnControl ); |
| } |
| else |
| { |
| clientMsgLog.log( new ReplicaEventMessage( modDnControl, moveContext.getEntry() ) ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( e.getMessage(), e ); |
| } |
| } |
| |
| |
| public void entryMovedAndRenamed( MoveAndRenameOperationContext moveAndRenameContext ) |
| { |
| |
| LOG.debug( "sending moveAndRenamed entry {}", moveAndRenameContext.getDn() ); |
| |
| try |
| { |
| if ( !moveAndRenameContext.getNewSuperiorDn().isChildOf( clientMsgLog.getSearchCriteria().getBase() ) ) |
| { |
| sendDeletedEntry( moveAndRenameContext.getEntry() ); |
| return; |
| } |
| |
| SyncModifyDnDecorator modDnControl = new SyncModifyDnDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService(), |
| SyncModifyDnType.MOVEANDRENAME ); |
| modDnControl.setEntryDn( moveAndRenameContext.getDn().getNormName() ); |
| modDnControl.setNewSuperiorDn( moveAndRenameContext.getNewSuperiorDn().getNormName() ); |
| modDnControl.setNewRdn( moveAndRenameContext.getNewRdn().getNormName() ); |
| modDnControl.setDeleteOldRdn( moveAndRenameContext.getDeleteOldRdn() ); |
| |
| if ( pushInRealTime ) |
| { |
| Entry alteredEntry = moveAndRenameContext.getModifiedEntry(); |
| |
| SearchResultEntry respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setObjectName( moveAndRenameContext.getModifiedEntry().getDn() ); |
| respEntry.setEntry( alteredEntry ); |
| |
| SyncStateValueDecorator syncModify = new SyncStateValueDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| syncModify.setSyncStateType( SyncStateTypeEnum.MODDN ); |
| syncModify.setEntryUUID( Strings.uuidToBytes(alteredEntry.get(SchemaConstants.ENTRY_UUID_AT) |
| .getString()) ); |
| syncModify.setCookie( getCookie( alteredEntry ) ); |
| respEntry.addControl( syncModify ); |
| respEntry.addControl( modDnControl ); |
| |
| WriteFuture future = session.getIoSession().write( respEntry ); |
| |
| handleWriteFuture( future, alteredEntry, null, modDnControl ); |
| } |
| else |
| { |
| clientMsgLog.log( new ReplicaEventMessage( modDnControl, moveAndRenameContext.getEntry() ) ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( e.getMessage(), e ); |
| } |
| } |
| |
| |
| public void entryRenamed( RenameOperationContext renameContext ) |
| { |
| Entry entry = renameContext.getEntry(); |
| |
| LOG.debug( "sending renamed entry {}", entry.getDn() ); |
| |
| try |
| { |
| SyncModifyDnDecorator modDnControl = new SyncModifyDnDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| modDnControl.setModDnType( SyncModifyDnType.RENAME ); |
| modDnControl.setEntryDn( renameContext.getDn().getName() ); |
| modDnControl.setNewRdn( renameContext.getNewRdn().getName() ); |
| modDnControl.setDeleteOldRdn( renameContext.getDeleteOldRdn() ); |
| |
| if ( pushInRealTime ) |
| { |
| SearchResultEntry respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setObjectName( entry.getDn() ); |
| respEntry.setEntry( entry ); |
| |
| SyncStateValueDecorator syncModify = new SyncStateValueDecorator( |
| session.getLdapServer().getDirectoryService().getLdapCodecService() ); |
| syncModify.setSyncStateType( SyncStateTypeEnum.MODDN ); |
| syncModify.setEntryUUID( Strings.uuidToBytes(entry.get(SchemaConstants.ENTRY_UUID_AT) |
| .getString()) ); |
| syncModify.setCookie( getCookie( renameContext.getModifiedEntry() ) ); |
| respEntry.addControl( syncModify ); |
| respEntry.addControl( modDnControl ); |
| |
| WriteFuture future = session.getIoSession().write( respEntry ); |
| |
| handleWriteFuture( future, renameContext.getModifiedEntry(), null, modDnControl ); |
| } |
| else |
| { |
| clientMsgLog.log( new ReplicaEventMessage( modDnControl, renameContext.getEntry() ) ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( e.getMessage(), e ); |
| } |
| } |
| |
| |
| public boolean isPushInRealTime() |
| { |
| return pushInRealTime; |
| } |
| |
| |
| public void setPushInRealTime( boolean pushInRealTime ) |
| { |
| this.pushInRealTime = pushInRealTime; |
| } |
| |
| |
| private byte[] getCookie( Entry entry ) throws LdapInvalidAttributeValueException |
| { |
| String csn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString(); |
| return Strings.getBytesUtf8(clientMsgLog.getId() + SyncReplProvider.REPLICA_ID_DELIM + csn); |
| } |
| |
| |
| private void handleWriteFuture( WriteFuture future, Entry entry, EventType event, SyncModifyDnDecorator modDnControl ) |
| { |
| future.awaitUninterruptibly(); |
| if ( !future.isWritten() ) |
| { |
| LOG.error( "Failed to write to the consumer {}", clientMsgLog.getId() ); |
| LOG.error( "", future.getException() ); |
| |
| // set realtime push to false, will be set back to true when the client |
| // comes back and sends another request this flag will be set to true |
| pushInRealTime = false; |
| |
| if ( modDnControl != null ) |
| { |
| clientMsgLog.log( new ReplicaEventMessage( modDnControl, entry ) ); |
| } |
| else |
| { |
| clientMsgLog.log( event, entry ); |
| } |
| } |
| } |
| } |