| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.directory.server.ldap.replication; |
| |
| |
| import org.apache.activemq.ActiveMQConnection; |
| import org.apache.activemq.ActiveMQMessageProducer; |
| import org.apache.activemq.ActiveMQSession; |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.activemq.broker.region.Queue; |
| import org.apache.activemq.command.ActiveMQObjectMessage; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.directory.server.core.event.EventType; |
| import org.apache.directory.server.core.event.NotificationCriteria; |
| import org.apache.directory.shared.ldap.model.entry.Entry; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * A message log used for storing the changes done on DIT on a syncrepl consumer's search base |
| * A separate log is maintained for each syncrepl consumer |
| * |
| * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
| */ |
| public class ReplicaEventLog |
| { |
| |
| /** IP address of the syncrepl consumer */ |
| private String hostName; |
| |
| /** the unmodified search filter as it was when received from the client */ |
| private String searchFilter; |
| |
| /** the csn that was sent to the client during the last sync session*/ |
| private String lastSentCsn; |
| |
| /** the persistent listener */ |
| private SyncReplSearchListener persistentListener; |
| |
| /** notification criteria used by the persistent sea*/ |
| private NotificationCriteria searchCriteria; |
| |
| /** the replica id */ |
| private int replicaId; |
| |
| /** flag indicating refreshAndPersist mode */ |
| private boolean refreshNPersist; |
| |
| // fields that won't be serialized |
| |
| /** the ActiveMQ session */ |
| private ActiveMQSession amqSession; |
| |
| /** the Queue used for storing messages */ |
| private ActiveMQQueue queue; |
| |
| /** message producer for Queue */ |
| private ActiveMQMessageProducer producer; |
| |
| /** the messaging system's connection */ |
| private ActiveMQConnection amqConnection; |
| |
| /** ActiveMQ's BrokerService */ |
| private BrokerService brokerService; |
| |
| private volatile boolean dirty; |
| |
| private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLog.class ); |
| |
| |
| public ReplicaEventLog() |
| { |
| |
| } |
| |
| |
| public ReplicaEventLog( int replicaId ) |
| { |
| this.replicaId = replicaId; |
| this.searchCriteria = new NotificationCriteria(); |
| this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK ); |
| } |
| |
| |
| /** |
| * instantiates a message queue and corresponding producer for storing DIT changes |
| * |
| * @param amqConnection ActiveMQ connection |
| * @param brokerService ActiveMQ's broker service |
| * @throws Exception |
| */ |
| public void configure( final ActiveMQConnection amqConnection, final BrokerService brokerService ) throws Exception |
| { |
| if ( amqSession == null || !amqSession.isRunning() ) |
| { |
| this.amqConnection = amqConnection; |
| amqSession = ( ActiveMQSession ) amqConnection.createSession( false, ActiveMQSession.AUTO_ACKNOWLEDGE ); |
| queue = ( ActiveMQQueue ) amqSession.createQueue( getQueueName() ); |
| producer = ( ActiveMQMessageProducer ) amqSession.createProducer( queue ); |
| this.brokerService = brokerService; |
| } |
| } |
| |
| |
| /** |
| * stores the given EventType and Entry in the queue |
| * |
| * @param event the EventType |
| * @param entry the modified Entry |
| */ |
| public void log( EventType event, Entry entry ) |
| { |
| LOG.debug( "logging entry with Dn {} with the event {}", entry.getDn(), event ); |
| log( new ReplicaEventMessage( event, entry ) ); |
| } |
| |
| |
| public void log( ReplicaEventMessage message ) |
| { |
| try |
| { |
| ActiveMQObjectMessage ObjectMessage = ( ActiveMQObjectMessage ) amqSession.createObjectMessage(); |
| ObjectMessage.setObject( message ); |
| producer.send( ObjectMessage ); |
| } |
| catch ( Exception e ) |
| { |
| LOG.warn( "Failed to insert the entry into syncrepl log", e ); |
| } |
| |
| } |
| |
| |
| /** |
| * deletes the queue (to remove the log) and recreates a new queue instance |
| * with the same queue name. Also creates the corresponding message producer |
| * |
| * @throws Exception |
| */ |
| public void truncate() throws Exception |
| { |
| producer.close(); |
| |
| String queueName = queue.getQueueName(); |
| LOG.debug( "deleting the queue {}", queueName ); |
| amqConnection.destroyDestination( queue ); |
| queue = null; |
| } |
| |
| |
| public void recreate() throws Exception |
| { |
| LOG.debug( "recreating the queue for the replica id {}", replicaId ); |
| queue = ( ActiveMQQueue ) amqSession.createQueue( getQueueName() ); |
| producer = ( ActiveMQMessageProducer ) amqSession.createProducer( queue ); |
| } |
| |
| |
| public void destroy() throws Exception |
| { |
| // first truncate |
| truncate(); |
| |
| // then close the producer and session, DO NOT close connection |
| producer.close(); |
| amqSession.close(); |
| } |
| |
| |
| @Override |
| public boolean equals( Object obj ) |
| { |
| if ( !( obj instanceof ReplicaEventLog ) ) |
| { |
| return false; |
| } |
| |
| ReplicaEventLog other = ( ReplicaEventLog ) obj; |
| |
| if ( replicaId != other.getId() ) |
| { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| |
| @Override |
| public int hashCode() |
| { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + searchFilter.hashCode(); |
| result = prime * result + hostName.hashCode(); |
| return result; |
| } |
| |
| |
| public int compareTo( ReplicaEventLog o ) |
| { |
| if ( this.equals( o ) ) |
| { |
| return 0; |
| } |
| |
| return 1; |
| } |
| |
| |
| public SyncReplSearchListener getPersistentListener() |
| { |
| return persistentListener; |
| } |
| |
| |
| public void setPersistentListener( SyncReplSearchListener persistentListener ) |
| { |
| this.persistentListener = persistentListener; |
| } |
| |
| |
| public NotificationCriteria getSearchCriteria() |
| { |
| return searchCriteria; |
| } |
| |
| |
| public void setSearchCriteria( NotificationCriteria searchCriteria ) |
| { |
| this.searchCriteria = searchCriteria; |
| } |
| |
| |
| public boolean isRefreshNPersist() |
| { |
| return refreshNPersist; |
| } |
| |
| |
| public void setRefreshNPersist( boolean refreshNPersist ) |
| { |
| this.refreshNPersist = refreshNPersist; |
| } |
| |
| |
| public int getId() |
| { |
| return replicaId; |
| } |
| |
| |
| public String getLastSentCsn() |
| { |
| return lastSentCsn; |
| } |
| |
| |
| public void setLastSentCsn( String lastSentCsn ) |
| { |
| // set only if there is a change in cookie value |
| // this will avoid setting the dirty flag which eventually is used for |
| // storing the details of this log |
| if ( !lastSentCsn.equals( this.lastSentCsn ) ) |
| { |
| this.lastSentCsn = lastSentCsn; |
| dirty = true; |
| } |
| } |
| |
| |
| public String getHostName() |
| { |
| return hostName; |
| } |
| |
| |
| public void setHostName( String hostName ) |
| { |
| this.hostName = hostName; |
| } |
| |
| |
| public String getSearchFilter() |
| { |
| return searchFilter; |
| } |
| |
| |
| public void setSearchFilter( String searchFilter ) |
| { |
| this.searchFilter = searchFilter; |
| } |
| |
| |
| public boolean isDirty() |
| { |
| return dirty; |
| } |
| |
| |
| public void setDirty( boolean dirty ) |
| { |
| this.dirty = dirty; |
| } |
| |
| |
| public String getQueueName() |
| { |
| return "replicaId=" + replicaId; |
| } |
| |
| |
| public ReplicaEventLogCursor getCursor() throws Exception |
| { |
| Queue regionQueue = ( Queue ) brokerService.getRegionBroker().getDestinationMap().get( queue ); |
| return new ReplicaEventLogCursor( amqSession, queue, regionQueue ); |
| } |
| |
| |
| @Override |
| public String toString() |
| { |
| return "ClientMessageQueueLog [ipAddress=" + hostName + ", filter=" + searchFilter + ", replicaId=" + replicaId |
| + ", lastSentCookie=" + lastSentCsn + "]"; |
| } |
| |
| } |