| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.directory.server.ldap.replication; |
| |
| |
| import static java.lang.Math.min; |
| import static org.apache.directory.server.ldap.LdapServer.NO_SIZE_LIMIT; |
| import static org.apache.directory.server.ldap.LdapServer.NO_TIME_LIMIT; |
| |
| import java.io.File; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.activemq.ActiveMQConnection; |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.directory.server.core.DirectoryService; |
| import org.apache.directory.server.core.entry.ClonedServerEntry; |
| import org.apache.directory.server.core.event.EventType; |
| import org.apache.directory.server.core.event.NotificationCriteria; |
| import org.apache.directory.server.core.filtering.EntryFilteringCursor; |
| import org.apache.directory.server.i18n.I18n; |
| import org.apache.directory.server.ldap.LdapServer; |
| import org.apache.directory.server.ldap.LdapSession; |
| import org.apache.directory.server.ldap.handlers.SearchAbandonListener; |
| import org.apache.directory.server.ldap.handlers.SearchTimeLimitingMonitor; |
| import org.apache.directory.shared.ldap.model.message.controls.ManageDsaIT; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncDoneValue.SyncDoneValueDecorator; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.ISyncInfoValue; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncInfoValue.SyncInfoValueDecorator; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncRequestValue.ISyncRequestValue; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncStateValue.SyncStateValueDecorator; |
| import org.apache.directory.shared.ldap.codec.controls.replication.syncmodifydn.SyncModifyDnDecorator; |
| import org.apache.directory.shared.ldap.message.control.replication.SyncStateTypeEnum; |
| import org.apache.directory.shared.ldap.message.control.replication.SynchronizationInfoEnum; |
| import org.apache.directory.shared.ldap.message.control.replication.SynchronizationModeEnum; |
| import org.apache.directory.shared.ldap.model.constants.SchemaConstants; |
| import org.apache.directory.shared.ldap.model.csn.Csn; |
| import org.apache.directory.shared.ldap.model.entry.Entry; |
| import org.apache.directory.shared.ldap.model.entry.EntryAttribute; |
| import org.apache.directory.shared.ldap.model.entry.StringValue; |
| import org.apache.directory.shared.ldap.model.entry.Value; |
| import org.apache.directory.shared.ldap.model.exception.LdapException; |
| import org.apache.directory.shared.ldap.model.exception.LdapURLEncodingException; |
| import org.apache.directory.shared.ldap.model.filter.AndNode; |
| import org.apache.directory.shared.ldap.model.filter.EqualityNode; |
| import org.apache.directory.shared.ldap.model.filter.ExprNode; |
| import org.apache.directory.shared.ldap.model.filter.GreaterEqNode; |
| import org.apache.directory.shared.ldap.model.filter.LdapURL; |
| import org.apache.directory.shared.ldap.model.filter.LessEqNode; |
| import org.apache.directory.shared.ldap.model.filter.OrNode; |
| import org.apache.directory.shared.ldap.model.filter.PresenceNode; |
| import org.apache.directory.shared.ldap.model.filter.SearchScope; |
| import org.apache.directory.shared.ldap.model.message.IntermediateResponse; |
| import org.apache.directory.shared.ldap.model.message.IntermediateResponseImpl; |
| import org.apache.directory.shared.ldap.model.message.LdapResult; |
| import org.apache.directory.shared.ldap.model.message.ReferralImpl; |
| import org.apache.directory.shared.ldap.model.message.Response; |
| import org.apache.directory.shared.ldap.model.message.ResultCodeEnum; |
| import org.apache.directory.shared.ldap.model.message.SearchRequest; |
| import org.apache.directory.shared.ldap.model.message.SearchResultDone; |
| import org.apache.directory.shared.ldap.model.message.SearchResultEntry; |
| import org.apache.directory.shared.ldap.model.message.SearchResultEntryImpl; |
| import org.apache.directory.shared.ldap.model.message.SearchResultReference; |
| import org.apache.directory.shared.ldap.model.message.SearchResultReferenceImpl; |
| import org.apache.directory.shared.ldap.model.schema.AttributeType; |
| import org.apache.directory.shared.util.Strings; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * NOTE: doco is missing at many parts. Will be added once the functionality is satisfactory |
| * |
| * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
| */ |
| @SuppressWarnings("unchecked") |
| public class SyncReplProvider implements ReplicationProvider |
| { |
| |
| public static final String REPLICA_ID_DELIM = ";"; |
| |
| private static final Logger LOG = LoggerFactory.getLogger( SyncReplProvider.class ); |
| |
| private boolean initialized = false; |
| |
| private DirectoryService dirService; |
| |
| /** The reference on the Ldap server instance */ |
| protected LdapServer ldapServer; |
| |
| private AttributeType objectClassAttributeType; |
| |
| private Map<Integer, ReplicaEventLog> replicaLogMap = new HashMap<Integer, ReplicaEventLog>(); |
| |
| private BrokerService brokerService; |
| |
| private ActiveMQConnection amqConnection; |
| |
| private File syncReplData; |
| |
| private AtomicInteger replicaCount = new AtomicInteger( 0 ); |
| |
| private ReplicaDitStoreUtil replicaUtil; |
| |
| |
| public SyncReplProvider() |
| { |
| } |
| |
| |
| public void init( LdapServer server ) |
| { |
| if ( initialized ) |
| { |
| LOG.warn( "syncrepl provider was already initialized" ); |
| return; |
| } |
| try |
| { |
| LOG.info( "initializing the syncrepl provider" ); |
| |
| this.ldapServer = server; |
| this.dirService = server.getDirectoryService(); |
| |
| File workDir = dirService.getInstanceLayout().getLogDirectory(); |
| syncReplData = new File( workDir, "syncrepl-data" ); |
| if ( !syncReplData.exists() ) |
| { |
| syncReplData.mkdirs(); |
| } |
| |
| String path = syncReplData.getPath(); |
| |
| brokerService = new BrokerService(); |
| brokerService.setUseJmx( false ); |
| brokerService.setPersistent( true ); |
| brokerService.setDataDirectory( path ); |
| |
| URI vmConnectorUri = new URI( "vm://localhost" ); |
| brokerService.setVmConnectorURI( vmConnectorUri ); |
| |
| brokerService.start(); |
| ActiveMQConnectionFactory amqFactory = new ActiveMQConnectionFactory( vmConnectorUri.toString() ); |
| amqFactory.setObjectMessageSerializationDefered( false ); |
| |
| amqConnection = ( ActiveMQConnection ) amqFactory.createConnection(); |
| amqConnection.start(); |
| |
| // set the static reference to SchemaManager |
| ReplicaEventMessage.setSchemaManager( dirService.getSchemaManager() ); |
| |
| replicaUtil = new ReplicaDitStoreUtil( dirService ); |
| |
| loadReplicaInfo(); |
| |
| registerPersistentSearches(); |
| |
| Thread consumerInfoUpdateThread = new Thread( createConsumerInfoUpdateTask() ); |
| consumerInfoUpdateThread.setDaemon( true ); |
| consumerInfoUpdateThread.start(); |
| |
| initialized = true; |
| LOG.info( "syncrepl provider initialized successfully" ); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to initialize the log files required by the syncrepl provider", e ); |
| throw new RuntimeException( e ); |
| } |
| } |
| |
| |
| public void stop() |
| { |
| try |
| { |
| brokerService.stop(); |
| amqConnection.close(); |
| } |
| catch ( Exception e ) |
| { |
| LOG.warn( "Failed to close the message queue connection", e ); |
| } |
| |
| initialized = false; |
| } |
| |
| |
| public void handleSyncRequest( LdapSession session, SearchRequest req ) throws LdapException |
| { |
| try |
| { |
| ISyncRequestValue syncControl = ( ISyncRequestValue ) req.getControls().get( |
| ISyncRequestValue.OID ); |
| |
| // cookie is in the format <replicaId>;<Csn value> |
| byte[] cookieBytes = syncControl.getCookie(); |
| String cookieString = Strings.utf8ToString(cookieBytes); |
| |
| if ( cookieBytes == null ) |
| { |
| doInitialRefresh( session, req ); |
| } |
| else |
| { |
| LOG.warn( "search request received with the cookie {}", Strings.utf8ToString(cookieBytes) ); |
| if ( !isValidCookie( cookieString ) ) |
| { |
| LOG.error( "received a invalid cookie {} from the consumer with session {}", cookieString, session ); |
| sendESyncRefreshRequired( session, req ); |
| } |
| else |
| { |
| ReplicaEventLog clientMsgLog = getReplicaEventLog( cookieString ); |
| if ( clientMsgLog == null ) |
| { |
| LOG.warn( "received a valid cookie {} but there is no event log associated with this replica", |
| cookieString ); |
| sendESyncRefreshRequired( session, req ); |
| } |
| else |
| { |
| doContentUpdate( session, req, clientMsgLog ); |
| } |
| } |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to handle the syncrepl request", e ); |
| |
| LdapException le = new LdapException( e.getMessage() ); |
| le.initCause( e ); |
| |
| throw le; |
| } |
| } |
| |
| |
| private String sendContentFromLog( LdapSession session, SearchRequest req, ReplicaEventLog clientMsgLog ) |
| throws Exception |
| { |
| // do the search from the log |
| String lastSentCsn = clientMsgLog.getLastSentCsn(); |
| |
| ReplicaEventLogCursor cursor = clientMsgLog.getCursor(); |
| while ( cursor.next() ) |
| { |
| ReplicaEventMessage message = cursor.get(); |
| Entry entry = message.getEntry(); |
| LOG.debug( "received message from the queue {}", entry ); |
| |
| lastSentCsn = entry.get( SchemaConstants.ENTRY_CSN_AT ).getString(); |
| |
| EventType event = message.getEventType(); |
| |
| // if event type is null, then it is a MODDN operation |
| if ( event == null ) |
| { |
| sendSearchResultEntry( session, req, entry, message.getModDnControl() ); |
| } |
| else |
| { |
| SyncStateTypeEnum syncStateType = null; |
| if ( event == EventType.ADD || event == EventType.MODIFY ) |
| { |
| syncStateType = SyncStateTypeEnum.ADD; |
| } |
| else if ( event == EventType.DELETE ) |
| { |
| syncStateType = SyncStateTypeEnum.DELETE; |
| } |
| |
| sendSearchResultEntry( session, req, entry, syncStateType ); |
| } |
| } |
| cursor.close(); |
| |
| return lastSentCsn; |
| } |
| |
| |
| private void doContentUpdate( LdapSession session, SearchRequest req, ReplicaEventLog replicaLog ) |
| throws Exception |
| { |
| boolean refreshNPersist = isRefreshNPersist( req ); |
| |
| // if this method is called with refreshAndPersist |
| // means the client was offline after it initiated a persistent synch session |
| // we need to update the handler's session |
| if ( refreshNPersist ) |
| { |
| SyncReplSearchListener handler = replicaLog.getPersistentListener(); |
| handler.setReq( req ); |
| handler.setSession( session ); |
| } |
| |
| String lastSentCsn = sendContentFromLog( session, req, replicaLog ); |
| |
| byte[] cookie = Strings.getBytesUtf8(replicaLog.getId() + REPLICA_ID_DELIM + lastSentCsn); |
| |
| if ( refreshNPersist ) |
| { |
| IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() ); |
| intermResp.setResponseName( ISyncInfoValue.OID ); |
| |
| SyncInfoValueDecorator syncInfo = new SyncInfoValueDecorator( ldapServer.getDirectoryService() |
| .getLdapCodecService(), |
| SynchronizationInfoEnum.NEW_COOKIE ); |
| syncInfo.setCookie( cookie ); |
| intermResp.setResponseValue( syncInfo.getValue() ); |
| |
| session.getIoSession().write( intermResp ); |
| |
| replicaLog.getPersistentListener().setPushInRealTime( refreshNPersist ); |
| } |
| else |
| { |
| SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse(); |
| searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.SUCCESS ); |
| SyncDoneValueDecorator syncDone = new SyncDoneValueDecorator( |
| ldapServer.getDirectoryService().getLdapCodecService() ); |
| syncDone.setCookie( cookie ); |
| searchDoneResp.addControl( syncDone ); |
| |
| session.getIoSession().write( searchDoneResp ); |
| } |
| |
| replicaLog.setLastSentCsn( lastSentCsn ); |
| } |
| |
| |
| private void doInitialRefresh( LdapSession session, SearchRequest req ) throws Exception |
| { |
| |
| String originalFilter = req.getFilter().toString(); |
| InetSocketAddress address = ( InetSocketAddress ) session.getIoSession().getRemoteAddress(); |
| String hostName = address.getAddress().getHostName(); |
| |
| ExprNode modifiedFilter = modifyFilter( session, req ); |
| |
| String contextCsn = dirService.getContextCsn(); |
| |
| boolean refreshNPersist = isRefreshNPersist( req ); |
| |
| // first register a persistent search handler before starting the initial content refresh |
| // this is to log all the operations happen on DIT during initial content refresh |
| |
| ReplicaEventLog replicaLog = createRelicaEventLog( hostName, originalFilter ); |
| |
| replicaLog.setRefreshNPersist( refreshNPersist ); |
| |
| // modify the filter to include the context Csn |
| GreaterEqNode csnGeNode = new GreaterEqNode( SchemaConstants.ENTRY_CSN_AT, new StringValue( contextCsn ) ); |
| ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode ); |
| req.setFilter( postInitContentFilter ); |
| |
| // now we process entries forever as they change |
| LOG.info( "starting persistent search for the client {}", replicaLog ); |
| |
| // irrespective of the sync mode set the 'isRealtimePush' to false initially so that we can |
| // store the modifications in the queue and later if it is a persist mode |
| // we push this queue's content and switch to realtime mode |
| SyncReplSearchListener handler = new SyncReplSearchListener( session, req, replicaLog, false ); |
| replicaLog.setPersistentListener( handler ); |
| |
| // compose notification criteria and add the listener to the event |
| // service using that notification criteria to determine which events |
| // are to be delivered to the persistent search issuing client |
| NotificationCriteria criteria = new NotificationCriteria(); |
| criteria.setAliasDerefMode( req.getDerefAliases() ); |
| criteria.setBase( req.getBase() ); |
| criteria.setFilter( req.getFilter() ); |
| criteria.setScope( req.getScope() ); |
| criteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK ); |
| |
| replicaLog.setSearchCriteria( criteria ); |
| |
| dirService.getEventService().addListener( handler, criteria ); |
| |
| // then start pushing initial content |
| LessEqNode csnNode = new LessEqNode( SchemaConstants.ENTRY_CSN_AT, new StringValue( contextCsn ) ); |
| |
| // modify the filter to include the context Csn |
| ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode ); |
| req.setFilter( initialContentFilter ); |
| |
| SearchResultDone searchDoneResp = doSimpleSearch( session, req ); |
| |
| if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS ) |
| { |
| replicaLog.setLastSentCsn( contextCsn ); |
| byte[] cookie = Strings.getBytesUtf8(replicaLog.getId() + REPLICA_ID_DELIM + contextCsn); |
| |
| if ( refreshNPersist ) // refreshAndPersist mode |
| { |
| contextCsn = sendContentFromLog( session, req, replicaLog ); |
| cookie = Strings.getBytesUtf8(replicaLog.getId() + REPLICA_ID_DELIM + contextCsn); |
| |
| IntermediateResponse intermResp = new IntermediateResponseImpl( req.getMessageId() ); |
| intermResp.setResponseName( ISyncInfoValue.OID ); |
| |
| SyncInfoValueDecorator syncInfo = new SyncInfoValueDecorator( |
| ldapServer.getDirectoryService().getLdapCodecService(), SynchronizationInfoEnum.NEW_COOKIE ); |
| syncInfo.setCookie( cookie ); |
| intermResp.setResponseValue( syncInfo.getValue() ); |
| |
| session.getIoSession().write( intermResp ); |
| |
| // switch the handler mode to realtime push |
| handler.setPushInRealTime( refreshNPersist ); |
| } |
| else |
| { |
| // no need to send from the log, that will be done in the next refreshOnly session |
| SyncDoneValueDecorator syncDone = new SyncDoneValueDecorator( |
| ldapServer.getDirectoryService().getLdapCodecService() ); |
| syncDone.setCookie( cookie ); |
| searchDoneResp.addControl( syncDone ); |
| session.getIoSession().write( searchDoneResp ); |
| } |
| } |
| else |
| // if not succeeded return |
| { |
| LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult() |
| .getResultCode() ); |
| replicaLog.truncate(); |
| replicaLog = null; |
| |
| // remove the listener |
| dirService.getEventService().removeListener( handler ); |
| |
| return; |
| } |
| |
| // if all is well then store the consumer infor |
| replicaUtil.addConsumerEntry( replicaLog ); |
| |
| // add to the map only after storing in the DIT, else the Replica update thread barfs |
| replicaLogMap.put( replicaLog.getId(), replicaLog ); |
| } |
| |
| |
| private SearchResultDone doSimpleSearch( LdapSession session, SearchRequest req ) throws Exception |
| { |
| SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse(); |
| LdapResult ldapResult = searchDoneResp.getLdapResult(); |
| |
| // A normal search |
| // Check that we have a cursor or not. |
| // No cursor : do a search. |
| EntryFilteringCursor cursor = session.getCoreSession().search( req ); |
| |
| // Position the cursor at the beginning |
| cursor.beforeFirst(); |
| |
| /* |
| * Iterate through all search results building and sending back responses |
| * for each search result returned. |
| */ |
| try |
| { |
| // Get the size limits |
| // Don't bother setting size limits for administrators that don't ask for it |
| long serverLimit = getServerSizeLimit( session, req ); |
| |
| long requestLimit = req.getSizeLimit() == 0L ? Long.MAX_VALUE : req.getSizeLimit(); |
| |
| req.addAbandonListener( new SearchAbandonListener( ldapServer, cursor ) ); |
| setTimeLimitsOnCursor( req, session, cursor ); |
| LOG.debug( "using <{},{}> for size limit", requestLimit, serverLimit ); |
| long sizeLimit = min( requestLimit, serverLimit ); |
| |
| readResults( session, req, ldapResult, cursor, sizeLimit ); |
| } |
| finally |
| { |
| if ( cursor != null ) |
| { |
| try |
| { |
| cursor.close(); |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( I18n.err( I18n.ERR_168 ), e ); |
| } |
| } |
| } |
| |
| return searchDoneResp; |
| } |
| |
| |
| private void readResults( LdapSession session, SearchRequest req, LdapResult ldapResult, |
| EntryFilteringCursor cursor, long sizeLimit ) throws Exception |
| { |
| long count = 0; |
| |
| while ( ( count < sizeLimit ) && cursor.next() ) |
| { |
| // Handle closed session |
| if ( session.getIoSession().isClosing() ) |
| { |
| // The client has closed the connection |
| LOG.debug( "Request terminated for message {}, the client has closed the session", req.getMessageId() ); |
| break; |
| } |
| |
| if ( req.isAbandoned() ) |
| { |
| // The cursor has been closed by an abandon request. |
| LOG.debug( "Request terminated by an AbandonRequest for message {}", req.getMessageId() ); |
| break; |
| } |
| |
| ClonedServerEntry entry = cursor.get(); |
| |
| sendSearchResultEntry( session, req, entry, SyncStateTypeEnum.ADD ); |
| |
| count++; |
| } |
| |
| // DO NOT WRITE THE RESPONSE - JUST RETURN IT |
| ldapResult.setResultCode( ResultCodeEnum.SUCCESS ); |
| |
| if ( ( count >= sizeLimit ) && ( cursor.next() ) ) |
| { |
| // We have reached the limit |
| // Move backward on the cursor to restore the previous position, as we moved forward |
| // to check if there is one more entry available |
| cursor.previous(); |
| // Special case if the user has requested more elements than the request size limit |
| ldapResult.setResultCode( ResultCodeEnum.SIZE_LIMIT_EXCEEDED ); |
| } |
| } |
| |
| |
| private void sendSearchResultEntry( LdapSession session, SearchRequest req, Entry entry, |
| SyncStateTypeEnum syncStateType ) throws Exception |
| { |
| |
| EntryAttribute uuid = entry.get( SchemaConstants.ENTRY_UUID_AT ); |
| SyncStateValueDecorator syncStateControl = new SyncStateValueDecorator( |
| ldapServer.getDirectoryService().getLdapCodecService() ); |
| syncStateControl.setSyncStateType( syncStateType ); |
| syncStateControl.setEntryUUID( Strings.uuidToBytes(uuid.getString()) ); |
| |
| if ( syncStateType == SyncStateTypeEnum.DELETE ) |
| { |
| // clear the entry's all attributes except the Dn and entryUUID |
| entry.clear(); |
| entry.add( uuid ); |
| } |
| |
| Response resp = generateResponse( session, req, entry ); |
| resp.addControl( syncStateControl ); |
| |
| session.getIoSession().write( resp ); |
| LOG.debug( "Sending {}", entry.getDn() ); |
| } |
| |
| |
| private void sendSearchResultEntry( LdapSession session, SearchRequest req, Entry entry, |
| SyncModifyDnDecorator modDnControl ) throws Exception |
| { |
| |
| EntryAttribute uuid = entry.get( SchemaConstants.ENTRY_UUID_AT ); |
| SyncStateValueDecorator syncStateControl = new SyncStateValueDecorator( |
| ldapServer.getDirectoryService().getLdapCodecService() ); |
| syncStateControl.setSyncStateType( SyncStateTypeEnum.MODDN ); |
| syncStateControl.setEntryUUID( Strings.uuidToBytes(uuid.getString()) ); |
| |
| Response resp = generateResponse( session, req, entry ); |
| resp.addControl( syncStateControl ); |
| resp.addControl( modDnControl ); |
| |
| session.getIoSession().write( resp ); |
| LOG.debug( "Sending {}", entry.getDn() ); |
| } |
| |
| |
| private Response generateResponse( LdapSession session, SearchRequest req, Entry entry ) throws Exception |
| { |
| EntryAttribute ref = entry.get( SchemaConstants.REF_AT ); |
| boolean hasManageDsaItControl = req.getControls().containsKey( ManageDsaIT.OID ); |
| |
| if ( ( ref != null ) && !hasManageDsaItControl ) |
| { |
| // The entry is a referral. |
| SearchResultReference respRef; |
| respRef = new SearchResultReferenceImpl( req.getMessageId() ); |
| respRef.setReferral( new ReferralImpl() ); |
| |
| for ( Value<?> val : ref ) |
| { |
| String url = val.getString(); |
| |
| if ( !url.startsWith( "ldap" ) ) |
| { |
| respRef.getReferral().addLdapUrl( url ); |
| } |
| |
| LdapURL ldapUrl = new LdapURL(); |
| ldapUrl.setForceScopeRendering( true ); |
| try |
| { |
| ldapUrl.parse( url.toCharArray() ); |
| } |
| catch ( LdapURLEncodingException e ) |
| { |
| LOG.error( I18n.err( I18n.ERR_165, url, entry ) ); |
| } |
| |
| switch ( req.getScope() ) |
| { |
| case SUBTREE: |
| ldapUrl.setScope( SearchScope.SUBTREE.getScope() ); |
| break; |
| |
| case ONELEVEL: // one level here is object level on remote server |
| ldapUrl.setScope( SearchScope.OBJECT.getScope() ); |
| break; |
| |
| default: |
| throw new IllegalStateException( I18n.err( I18n.ERR_686 ) ); |
| } |
| |
| respRef.getReferral().addLdapUrl( ldapUrl.toString() ); |
| } |
| |
| return respRef; |
| } |
| else |
| { |
| // The entry is not a referral, or the ManageDsaIt control is set |
| SearchResultEntry respEntry; |
| respEntry = new SearchResultEntryImpl( req.getMessageId() ); |
| respEntry.setEntry( entry ); |
| respEntry.setObjectName( entry.getDn() ); |
| |
| // Filter the userPassword if the server mandate to do so |
| if ( session.getCoreSession().getDirectoryService().isPasswordHidden() ) |
| { |
| // Remove the userPassord attribute from the entry. |
| respEntry.getEntry().removeAttributes( SchemaConstants.USER_PASSWORD_AT ); |
| } |
| |
| return respEntry; |
| } |
| } |
| |
| |
| /** |
| * Return the server size limit |
| */ |
| private long getServerSizeLimit( LdapSession session, SearchRequest request ) |
| { |
| if ( session.getCoreSession().isAnAdministrator() ) |
| { |
| if ( request.getSizeLimit() == NO_SIZE_LIMIT ) |
| { |
| return Long.MAX_VALUE; |
| } |
| else |
| { |
| return request.getSizeLimit(); |
| } |
| } |
| else |
| { |
| if ( ldapServer.getMaxSizeLimit() == NO_SIZE_LIMIT ) |
| { |
| return Long.MAX_VALUE; |
| } |
| else |
| { |
| return ldapServer.getMaxSizeLimit(); |
| } |
| } |
| } |
| |
| |
| private void setTimeLimitsOnCursor( SearchRequest req, LdapSession session, |
| final EntryFilteringCursor cursor ) |
| { |
| // Don't bother setting time limits for administrators |
| if ( session.getCoreSession().isAnAdministrator() && req.getTimeLimit() == NO_TIME_LIMIT ) |
| { |
| return; |
| } |
| |
| /* |
| * Non administrator based searches are limited by time if the server |
| * has been configured with unlimited time and the request specifies |
| * unlimited search time |
| */ |
| if ( ldapServer.getMaxTimeLimit() == NO_TIME_LIMIT && req.getTimeLimit() == NO_TIME_LIMIT ) |
| { |
| return; |
| } |
| |
| /* |
| * If the non-administrator user specifies unlimited time but the server |
| * is configured to limit the search time then we limit by the max time |
| * allowed by the configuration |
| */ |
| if ( req.getTimeLimit() == 0 ) |
| { |
| cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) ); |
| return; |
| } |
| |
| /* |
| * If the non-administrative user specifies a time limit equal to or |
| * less than the maximum limit configured in the server then we |
| * constrain search by the amount specified in the request |
| */ |
| if ( ldapServer.getMaxTimeLimit() >= req.getTimeLimit() ) |
| { |
| cursor.setClosureMonitor( new SearchTimeLimitingMonitor( req.getTimeLimit(), TimeUnit.SECONDS ) ); |
| return; |
| } |
| |
| /* |
| * Here the non-administrative user's requested time limit is greater |
| * than what the server's configured maximum limit allows so we limit |
| * the search to the configured limit |
| */ |
| cursor.setClosureMonitor( new SearchTimeLimitingMonitor( ldapServer.getMaxTimeLimit(), TimeUnit.SECONDS ) ); |
| } |
| |
| |
| public ExprNode modifyFilter( LdapSession session, SearchRequest req ) throws Exception |
| { |
| /* |
| * Do not add the OR'd (objectClass=referral) expression if the user |
| * searches for the subSchemaSubEntry as the SchemaIntercepter can't |
| * handle an OR'd filter. |
| */ |
| // if ( isSubSchemaSubEntrySearch( session, req ) ) |
| // { |
| // return; |
| // } |
| |
| /* |
| * Most of the time the search filter is just (objectClass=*) and if |
| * this is the case then there's no reason at all to OR this with an |
| * (objectClass=referral). If we detect this case then we leave it |
| * as is to represent the OR condition: |
| * |
| * (| (objectClass=referral)(objectClass=*)) == (objectClass=*) |
| */ |
| boolean isOcPresenceFilter = false; |
| if ( req.getFilter() instanceof PresenceNode ) |
| { |
| PresenceNode presenceNode = ( PresenceNode ) req.getFilter(); |
| |
| AttributeType at = session.getCoreSession().getDirectoryService().getSchemaManager() |
| .lookupAttributeTypeRegistry( presenceNode.getAttribute() ); |
| if ( at.getOid().equals( SchemaConstants.OBJECT_CLASS_AT_OID ) ) |
| { |
| isOcPresenceFilter = true; |
| } |
| } |
| |
| ExprNode filter = req.getFilter(); |
| if ( !req.hasControl( ManageDsaIT.OID ) && !isOcPresenceFilter ) |
| { |
| filter = new OrNode( req.getFilter(), newIsReferralEqualityNode( session ) ); |
| } |
| |
| return filter; |
| } |
| |
| |
| private EqualityNode<String> newIsReferralEqualityNode( LdapSession session ) throws Exception |
| { |
| if ( objectClassAttributeType == null ) |
| { |
| objectClassAttributeType = session.getCoreSession().getDirectoryService().getSchemaManager() |
| .lookupAttributeTypeRegistry( SchemaConstants.OBJECT_CLASS_AT ); |
| } |
| |
| EqualityNode<String> ocIsReferral = new EqualityNode<String>( SchemaConstants.OBJECT_CLASS_AT, new StringValue( |
| objectClassAttributeType, SchemaConstants.REFERRAL_OC ) ); |
| |
| return ocIsReferral; |
| } |
| |
| |
| private void storeReplicaInfo() |
| { |
| try |
| { |
| for ( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() ) |
| { |
| ReplicaEventLog replica = e.getValue(); |
| if ( replica.isDirty() ) |
| { |
| LOG.debug( "updating the details of replica {}", replica ); |
| replicaUtil.updateReplicaLastSentCsn( replica ); |
| replica.setDirty( false ); |
| } |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to store the replica information", e ); |
| } |
| } |
| |
| |
| private void loadReplicaInfo() |
| { |
| try |
| { |
| |
| List<ReplicaEventLog> replicas = replicaUtil.getReplicaConsumers(); |
| if ( !replicas.isEmpty() ) |
| { |
| for ( ReplicaEventLog r : replicas ) |
| { |
| LOG.debug( "initializing the replica log from {}", r.getId() ); |
| r.configure( amqConnection, brokerService ); |
| replicaLogMap.put( r.getId(), r ); |
| |
| // update the replicaCount's value to assign a correct value to the new replica(s) |
| if ( replicaCount.get() < r.getId() ) |
| { |
| replicaCount.set( r.getId() ); |
| } |
| } |
| } |
| else |
| { |
| LOG.debug( "no replica logs found to initialize" ); |
| } |
| } |
| catch ( Exception e ) |
| { |
| LOG.error( "Failed to load the replica information", e ); |
| } |
| } |
| |
| |
| private void registerPersistentSearches() throws Exception |
| { |
| for ( Map.Entry<Integer, ReplicaEventLog> e : replicaLogMap.entrySet() ) |
| { |
| ReplicaEventLog log = e.getValue(); |
| |
| if ( log.getSearchCriteria() != null ) |
| { |
| LOG.debug( "registering peristent search for the replica {}", log.getId() ); |
| SyncReplSearchListener handler = new SyncReplSearchListener( null, null, log, false ); |
| log.setPersistentListener( handler ); |
| |
| dirService.getEventService().addListener( handler, log.getSearchCriteria() ); |
| } |
| else |
| { |
| LOG.warn( "invalid peristent search criteria {} for the replica {}", log.getSearchCriteria(), log |
| .getId() ); |
| } |
| } |
| } |
| |
| |
| private Runnable createConsumerInfoUpdateTask() |
| { |
| Runnable task = new Runnable() |
| { |
| public void run() |
| { |
| while ( true ) |
| { |
| storeReplicaInfo(); |
| try |
| { |
| Thread.sleep( 10000 ); |
| } |
| catch ( InterruptedException e ) |
| { |
| LOG.warn( "thread storing the replica information was interrupted", e ); |
| } |
| } |
| } |
| }; |
| |
| return task; |
| } |
| |
| |
| private boolean isValidCookie( String cookieString ) |
| { |
| if ( cookieString == null || cookieString.trim().length() == 0 ) |
| { |
| return false; |
| } |
| |
| int pos = cookieString.indexOf( REPLICA_ID_DELIM ); |
| if ( pos <= 0 ) // position should start from 1 or higher cause a cookie can be like "0;<csn>" or "11;<csn>" |
| { |
| return false; |
| } |
| |
| String replicaId = cookieString.substring( 0, pos ); |
| try |
| { |
| Integer.parseInt( replicaId ); |
| } |
| catch ( NumberFormatException e ) |
| { |
| LOG.debug( "Failed to parse the replica id {}", replicaId ); |
| return false; |
| } |
| |
| if ( pos == cookieString.length() ) |
| { |
| return false; |
| } |
| |
| String csnString = cookieString.substring( pos + 1 ); |
| |
| return Csn.isValid(csnString); |
| } |
| |
| |
| private int getReplicaId( String cookieString ) |
| { |
| String replicaId = cookieString.substring( 0, cookieString.indexOf( REPLICA_ID_DELIM ) ); |
| return Integer.parseInt( replicaId ); |
| } |
| |
| |
| private ReplicaEventLog getReplicaEventLog( String cookieString ) throws Exception |
| { |
| ReplicaEventLog replicaLog = null; |
| |
| if ( isValidCookie( cookieString ) ) |
| { |
| int clientId = getReplicaId( cookieString ); |
| replicaLog = replicaLogMap.get( clientId ); |
| } |
| |
| return replicaLog; |
| } |
| |
| |
| private ReplicaEventLog createRelicaEventLog( String hostName, String filter ) throws Exception |
| { |
| int replicaId = replicaCount.incrementAndGet(); |
| |
| LOG.debug( "creating a new event log for the replica with id {}", replicaId ); |
| |
| ReplicaEventLog replicaLog = new ReplicaEventLog( replicaId ); |
| replicaLog.setHostName( hostName ); |
| replicaLog.setSearchFilter( filter ); |
| |
| replicaLog.configure( amqConnection, brokerService ); |
| |
| return replicaLog; |
| } |
| |
| |
| private void sendESyncRefreshRequired( LdapSession session, SearchRequest req ) throws Exception |
| { |
| SearchResultDone searchDoneResp = ( SearchResultDone ) req.getResultResponse(); |
| searchDoneResp.getLdapResult().setResultCode( ResultCodeEnum.E_SYNC_REFRESH_REQUIRED ); |
| SyncDoneValueDecorator syncDone = new SyncDoneValueDecorator( |
| ldapServer.getDirectoryService().getLdapCodecService() ); |
| searchDoneResp.addControl( syncDone ); |
| |
| session.getIoSession().write( searchDoneResp ); |
| } |
| |
| |
| private boolean isRefreshNPersist( SearchRequest req ) |
| { |
| ISyncRequestValue control = ( ISyncRequestValue ) req.getControls().get( |
| ISyncRequestValue.OID ); |
| return ( control.getMode() == SynchronizationModeEnum.REFRESH_AND_PERSIST ? true : false ); |
| } |
| } |