blob: ddb7a247677445ce562c2529d59cf8ffa2ecd707 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.directory.mitosis.service.protocol.handler;
import org.apache.directory.mitosis.common.CSN;
import org.apache.directory.mitosis.common.CSNVector;
import org.apache.directory.mitosis.common.DefaultCSN;
import org.apache.directory.mitosis.common.Replica;
import org.apache.directory.mitosis.common.ReplicaId;
import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
import org.apache.directory.mitosis.operation.AddEntryOperation;
import org.apache.directory.mitosis.operation.Operation;
import org.apache.directory.mitosis.service.ReplicationContext;
import org.apache.directory.mitosis.service.ReplicationContext.State;
import org.apache.directory.mitosis.service.protocol.Constants;
import org.apache.directory.mitosis.service.protocol.message.BaseMessage;
import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesAckMessage;
import org.apache.directory.mitosis.service.protocol.message.BeginLogEntriesMessage;
import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesAckMessage;
import org.apache.directory.mitosis.service.protocol.message.EndLogEntriesMessage;
import org.apache.directory.mitosis.service.protocol.message.LogEntryAckMessage;
import org.apache.directory.mitosis.service.protocol.message.LogEntryMessage;
import org.apache.directory.mitosis.service.protocol.message.LoginAckMessage;
import org.apache.directory.mitosis.service.protocol.message.LoginMessage;
import org.apache.directory.mitosis.store.ReplicationLogIterator;
import org.apache.directory.mitosis.store.ReplicationStore;
import org.apache.directory.server.core.entry.ServerEntry;
import org.apache.directory.server.core.entry.ServerSearchResult;
import org.apache.directory.server.core.interceptor.context.SearchOperationContext;
import org.apache.directory.shared.ldap.constants.SchemaConstants;
import org.apache.directory.shared.ldap.entry.EntryAttribute;
import org.apache.directory.shared.ldap.entry.Value;
import org.apache.directory.shared.ldap.filter.PresenceNode;
import org.apache.directory.shared.ldap.message.AliasDerefMode;
import org.apache.directory.shared.ldap.name.LdapDN;
import org.apache.directory.shared.ldap.schema.OidNormalizer;
import org.apache.directory.shared.ldap.util.StringTools;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.util.SessionLog;
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
import javax.naming.directory.SearchControls;
import java.net.InetSocketAddress;
import java.util.Map;
/**
* {@link ReplicationContextHandler} that implements client-side replication
* logic which sends any changes out-of-date to server. The following is
* the detailed protocol flow and the description of the replication logic
* execution.
* <ul>
* <li><tt>ClientConnectionManager</tt> connects the client to the server.</li>
* <li>The client sends {@link LoginMessage} to the server.</li>
* <li>The server responds with {@link LoginAckMessage} to the client
* <ul>
* <li>Unless the response code is {@link Constants#OK}, disconnect.
* Next connection attempt is performed by
* <tt>ClientConnectionManager</tt> later.</li>
* <li>Otherwise, the state of the {@link ReplicationContext} changes to
* {@link State#READY}, and proceed.</li>
* </ul></li>
* <li>The client tries to transfer the data that server needs from
* in {@link ReplicationStore} periodically using
* {@link #contextIdle(ReplicationContext, IdleStatus)} event,
* which is implemented using <tt>sessionIdle</tt> event in MINA.
* <ul>
* <li>The client sends a {@link BeginLogEntriesMessage} to the server.</li>
* <li>The server responds with {@link BeginLogEntriesAckMessage}.
* <ul>
* <li>If the response code is {@link Constants#OK},
* <ul>
* <li>{@link BeginLogEntriesAckMessage} contains a
* Update Vector (UV) of the server. The client compares
* the received UV and the client's Purge Vector (PV).
* <ul>
* <li>If the PV is greater than the UV, this means the client
* can't send all operation logs that server needs to get
* synchronized. This usually means that the server has
* been offline for too long time and got out-of-sync
* finally due to the log-purging process of the client
* side (see {@link ReplicationConfiguration#getLogMaxAge()}).
* The clients sends all entries in the DIT to the server,
* and the server overwrites its current DIT with the
* received entries.</li>
* <li>Otherwise, the client sends only the changed part since
* the last synchronization by querying its
* {@link ReplicationStore} by calling
* {@link ReplicationStore#getLogs(CSNVector, boolean)}.</li>
* <li>The data transfer is very simple. It's asynchronous
* request-response exchange. The client sends {@link LogEntryMessage},
* and then the server responds with {@link LogEntryAckMessage}.</li>
* </ul></li>
* <li>If the response code is not {@link Constants#OK}, retry later.</li>
* </ul></li>
* </ul></li>
* </ul>
*
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev: 116 $, $Date: 2006-09-18 13:47:53Z $
*/
public class ReplicationClientContextHandler implements ReplicationContextHandler
{
public void contextBegin( ReplicationContext ctx ) throws Exception
{
// Send a login message.
LoginMessage m = new LoginMessage( ctx.getNextSequence(), ctx.getService().getConfiguration().getReplicaId() );
writeTimeLimitedMessage( ctx, m );
// Set write timeout
ctx.getSession().setWriteTimeout( ctx.getConfiguration().getResponseTimeout() );
// Check update vector of the remote peer periodically.
ctx.getSession().setIdleTime( IdleStatus.BOTH_IDLE, ctx.getConfiguration().getReplicationInterval() );
}
public void contextEnd( ReplicationContext ctx ) throws Exception
{
}
public void messageReceived( ReplicationContext ctx, Object message ) throws Exception
{
ctx.cancelExpiration( ( ( BaseMessage ) message ).getSequence() );
if ( ctx.getState() == State.READY )
{
if ( message instanceof LogEntryAckMessage )
{
onLogEntryAck( ctx, ( LogEntryAckMessage ) message );
}
else if ( message instanceof BeginLogEntriesAckMessage )
{
onBeginLogEntriesAck( ctx, ( BeginLogEntriesAckMessage ) message );
}
else if ( message instanceof EndLogEntriesAckMessage )
{
// Do nothing
}
else
{
onUnexpectedMessage( ctx, message );
}
}
else
{
if ( message instanceof LoginAckMessage )
{
onLoginAck( ctx, ( LoginAckMessage ) message );
}
else
{
onUnexpectedMessage( ctx, message );
}
}
}
public void messageSent( ReplicationContext ctx, Object message ) throws Exception
{
}
/**
* A helper to write a message and schedule that message for expiration.
*
* @param ctx the replication context
* @param message the message to replicate
* @return the write future to block on this replication message transmission
*/
public WriteFuture writeTimeLimitedMessage( ReplicationContext ctx, Object message )
{
ctx.scheduleExpiration( message );
return ctx.getSession().write( message );
}
public void exceptionCaught( ReplicationContext ctx, Throwable cause ) throws Exception
{
if ( SessionLog.isWarnEnabled( ctx.getSession() ) )
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Unexpected exception.", cause );
}
ctx.getSession().close();
}
public void contextIdle( ReplicationContext ctx, IdleStatus status ) throws Exception
{
beginReplication( ctx );
}
private void onLoginAck( ReplicationContext ctx, LoginAckMessage message )
{
if ( message.getResponseCode() != Constants.OK )
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Login attempt failed: " + message.getResponseCode() );
ctx.getSession().close();
return;
}
for ( Replica replica : ctx.getConfiguration().getPeerReplicas() )
{
if ( replica.getId().equals( message.getReplicaId() ) )
{
if ( replica.getAddress().getAddress().equals(
( ( InetSocketAddress ) ctx.getSession().getRemoteAddress() ).getAddress() ) )
{
ctx.setPeer( replica );
ctx.setState( State.READY );
return;
}
else
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Peer address mismatches: " + ctx.getSession().getRemoteAddress() + " (expected: "
+ replica.getAddress() );
ctx.getSession().close();
return;
}
}
}
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Unknown peer replica ID: " + message.getReplicaId() );
ctx.getSession().close();
}
public boolean beginReplication( ReplicationContext ctx )
{
// If this cilent is logged in, all responses for sent messages
// (LogEntryMessages) is received, and no write request is pending,
// it means previous replication process ended or this is the
// first replication attempt.
if ( ctx.getState() == State.READY && ctx.getScheduledExpirations() <= 0
&& ctx.getSession().getScheduledWriteRequests() <= 0 )
{
// Initiate replication process asking update vector.
if ( SessionLog.isDebugEnabled( ctx.getSession() ) )
{
SessionLog.debug( ctx.getSession(), "(" + ctx.getConfiguration().getReplicaId().getId() + "->"
+ ( ctx.getPeer() != null ? ctx.getPeer().getId().getId() : "null" ) + ") Beginning replication. " );
}
ctx.getSession().write( new BeginLogEntriesMessage( ctx.getNextSequence() ) );
return true;
}
else
{
if ( SessionLog.isDebugEnabled( ctx.getSession() ) )
{
SessionLog.debug( ctx.getSession(), "(" + ctx.getConfiguration().getReplicaId().getId() + "->"
+ ( ctx.getPeer() != null ? ctx.getPeer().getId().getId() : "null" )
+ ") Couldn't begin replication. State:" + ctx.getState() + ", scheduledExpirations:"
+ ctx.getScheduledExpirations() + ", scheduledWriteRequests:"
+ ctx.getSession().getScheduledWriteRequests() );
}
return false;
}
}
private void onLogEntryAck( ReplicationContext ctx, LogEntryAckMessage message ) throws Exception
{
if ( message.getResponseCode() != Constants.OK )
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Remote peer failed to execute a log entry." );
ctx.getSession().close();
}
}
private void onBeginLogEntriesAck( ReplicationContext ctx, BeginLogEntriesAckMessage message )
throws NamingException
{
// Start transaction only when the server says OK.
if ( message.getResponseCode() != Constants.OK )
{
return;
}
ReplicationStore store = ctx.getConfiguration().getStore();
CSNVector yourUV = message.getUpdateVector();
CSNVector myPV;
try
{
myPV = store.getPurgeVector();
}
catch ( Exception e )
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Failed to get update vector.", e );
ctx.getSession().close();
return;
}
// Do full-DIT transfer if the peer is new and I'm not new.
try
{
if ( myPV.size() > 0 && yourUV.size() == 0 )
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Starting a whole DIT transfer." );
sendAllEntries( ctx );
}
else
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Starting a partial replication log transfer." );
sendReplicationLogs( ctx, myPV, yourUV );
}
}
finally
{
// Send EngLogEntries message to release the remote peer resources.
ctx.getSession().write( new EndLogEntriesMessage( ctx.getNextSequence() ) );
}
}
private void sendAllEntries( ReplicationContext ctx ) throws NamingException
{
ServerEntry rootDSE = ctx.getDirectoryService().getPartitionNexus().getRootDSE( null );
EntryAttribute namingContextsAttr = rootDSE.get( SchemaConstants.NAMING_CONTEXTS_AT );
if ( namingContextsAttr == null || namingContextsAttr.size() == 0 )
{
SessionLog.warn( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] No namingContexts attributes in rootDSE." );
return;
}
// Iterate all context partitions to send all entries of them.
for ( Value<?> namingContext : namingContextsAttr )
{
// Convert attribute value to JNDI name.
LdapDN contextName;
contextName = new LdapDN( ( String ) namingContext.get() );
SessionLog.info( ctx.getSession(), "[Replica-" + ctx.getConfiguration().getReplicaId()
+ "] Sending entries under '" + contextName + '\'' );
Map<String, OidNormalizer> mapping = ctx.getDirectoryService().getRegistries().getAttributeTypeRegistry()
.getNormalizerMapping();
contextName.normalize( mapping );
sendAllEntries( ctx, contextName );
}
}
private void sendAllEntries( ReplicationContext ctx, LdapDN contextName ) throws NamingException
{
// Retrieve all subtree including the base entry
SearchControls ctrl = new SearchControls();
ctrl.setSearchScope( SearchControls.SUBTREE_SCOPE );
NamingEnumeration<ServerSearchResult> e = ctx.getDirectoryService().getPartitionNexus().search(
new SearchOperationContext( ctx.getDirectoryService().getRegistries(), contextName,
AliasDerefMode.DEREF_ALWAYS, new PresenceNode( SchemaConstants.OBJECT_CLASS_AT_OID ), ctrl ) );
try
{
while ( e.hasMore() )
{
ServerSearchResult sr = e.next();
ServerEntry attrs = sr.getServerEntry();
// Skip entries without entryCSN attribute.
EntryAttribute entryCSNAttr = attrs.get( org.apache.directory.mitosis.common.Constants.ENTRY_CSN );
if ( entryCSNAttr == null )
{
continue;
}
// Get entryCSN of the entry. Skip if entryCSN value is invalid.
CSN csn;
try
{
Object val = entryCSNAttr.get();
if ( val instanceof byte[] )
{
csn = new DefaultCSN( StringTools.utf8ToString( ( byte[] ) val ) );
}
else
{
csn = new DefaultCSN( ( String ) val );
}
}
catch ( IllegalArgumentException ex )
{
SessionLog.warn( ctx.getSession(), "An entry with improper entryCSN: " + sr.getDn() );
continue;
}
// Convert the entry into AddEntryOperation log.
LdapDN dn = sr.getDn();
dn.normalize( ctx.getDirectoryService().getRegistries().getAttributeTypeRegistry()
.getNormalizerMapping() );
Operation op = new AddEntryOperation( csn, attrs );
// Send a LogEntry message for the entry.
writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
}
}
finally
{
e.close();
}
}
@SuppressWarnings("unchecked")
private void sendReplicationLogs( ReplicationContext ctx, CSNVector myPV, CSNVector yourUV )
{
for ( ReplicaId replicaId : myPV.getReplicaIds() )
{
CSN myCSN = myPV.getCSN( replicaId );
CSN yourCSN = yourUV.getCSN( replicaId );
if ( yourCSN != null && ( myCSN == null || yourCSN.compareTo( myCSN ) < 0 ) )
{
SessionLog.warn( ctx.getSession(), "Remote update vector (" + yourUV
+ ") is out-of-date. Full replication is required." );
ctx.getSession().close();
return;
}
}
ReplicationLogIterator logIt = ctx.getConfiguration().getStore().getLogs( yourUV, false );
try
{
while ( logIt.next() )
{
Operation op = logIt.getOperation();
writeTimeLimitedMessage( ctx, new LogEntryMessage( ctx.getNextSequence(), op ) );
}
}
finally
{
logIt.close();
}
}
private void onUnexpectedMessage( ReplicationContext ctx, Object message )
{
SessionLog.warn( ctx.getSession(), "Unexpected message: " + message );
ctx.getSession().close();
}
}