blob: 0b93ef7829474832accfbd40c21ae1a57842226a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.directory.mitosis.store.derby;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.directory.mitosis.common.CSN;
import org.apache.directory.mitosis.common.CSNVector;
import org.apache.directory.mitosis.common.DefaultCSN;
import org.apache.directory.mitosis.common.ReplicaId;
import org.apache.directory.mitosis.common.UUID;
import org.apache.directory.mitosis.configuration.ReplicationConfiguration;
import org.apache.directory.mitosis.operation.Operation;
import org.apache.directory.mitosis.operation.OperationCodec;
import org.apache.directory.mitosis.store.ReplicationLogIterator;
import org.apache.directory.mitosis.store.ReplicationStore;
import org.apache.directory.mitosis.store.ReplicationStoreException;
import org.apache.directory.server.core.DirectoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.Name;
import javax.naming.ldap.LdapName;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
public class DerbyReplicationStore implements ReplicationStore
{
private static final Logger LOG = LoggerFactory.getLogger( DerbyReplicationStore.class );
private static final String DEFAULT_TABLE_PREFIX = "REPLICATION_";
private static final String KEY_REPLICA_ID = "replicaId";
private static final String DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_URI_PREFIX = "jdbc:derby:";
private String dbURI;
private BasicDataSource dataSource;
private ReplicaId replicaId;
private String tablePrefix = DEFAULT_TABLE_PREFIX;
private String metadataTableName;
private String uuidTableName;
private String logTableName;
private Set<ReplicaId> knownReplicaIds;
private final Object knownReplicaIdsLock = new Object();
private final OperationCodec operationCodec = new OperationCodec();
public String getTablePrefix()
{
return tablePrefix;
}
public void setTablePrefix( String tablePrefix )
{
if ( tablePrefix == null )
{
tablePrefix = DEFAULT_TABLE_PREFIX;
}
tablePrefix = tablePrefix.trim();
if ( tablePrefix.length() == 0 )
{
tablePrefix = DEFAULT_TABLE_PREFIX;
}
this.tablePrefix = tablePrefix;
}
public void open( DirectoryService serviceCfg, ReplicationConfiguration cfg )
{
replicaId = cfg.getReplicaId();
// Calculate DB URI
dbURI = DB_URI_PREFIX + serviceCfg.getWorkingDirectory().getPath() + File.separator
+ "replication";
// Create database if not exists.
try
{
Class.forName( DRIVER_NAME );
Connection con = DriverManager.getConnection( dbURI + ";create=true" );
con.close();
}
catch ( Exception e )
{
throw new ReplicationStoreException( "Failed to initialize Derby database.", e );
}
// Initialize DataSource
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName( DRIVER_NAME );
dataSource.setUrl( dbURI );
dataSource.setUsername( "sa" );
dataSource.setPassword( "" );
this.dataSource = dataSource;
// Pre-calculate table names
metadataTableName = tablePrefix + "METADATA";
uuidTableName = tablePrefix + "UUID";
logTableName = tablePrefix + "LOG";
initSchema();
loadMetadata();
}
private void initSchema()
{
Connection con = null;
Statement stmt = null;
ResultSet rs = null;
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
stmt = con.createStatement();
try
{
rs = stmt.executeQuery( "SELECT M_KEY FROM " + metadataTableName + " WHERE M_KEY IS NULL" );
rs.close();
rs = null;
}
catch ( SQLException e )
{
stmt.executeUpdate( "CREATE TABLE " + metadataTableName + " ("
+ " M_KEY VARCHAR(30) NOT NULL PRIMARY KEY," + " M_VALUE VARCHAR(100) NOT NULL )" );
}
try
{
rs = stmt.executeQuery( "SELECT UUID FROM " + uuidTableName + " WHERE UUID IS NULL" );
rs.close();
rs = null;
}
catch ( SQLException e )
{
stmt.executeUpdate( "CREATE TABLE " + uuidTableName + " (" + " UUID CHAR(32) NOT NULL PRIMARY KEY,"
+ " DN CLOB NOT NULL" + ")" );
}
try
{
rs = stmt.executeQuery( "SELECT CSN_REPLICA_ID FROM " + logTableName + " WHERE CSN_REPLICA_ID IS NULL" );
rs.close();
rs = null;
}
catch ( SQLException e )
{
stmt.executeUpdate( "CREATE TABLE " + logTableName + " (" + " CSN_REPLICA_ID VARCHAR(16) NOT NULL,"
+ " CSN_TIMESTAMP BIGINT NOT NULL," + " CSN_OP_SEQ INTEGER NOT NULL,"
+ " OPERATION BLOB NOT NULL," + "CONSTRAINT " + logTableName + "_PK PRIMARY KEY ("
+ " CSN_REPLICA_ID," + " CSN_TIMESTAMP," + " CSN_OP_SEQ)" + ")" );
}
}
catch ( SQLException e )
{
throw new ReplicationStoreException( "Failed to initialize DB schema.", e );
}
finally
{
SQLUtil.cleanup( con, stmt, rs );
}
}
private void loadMetadata()
{
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
con.setTransactionIsolation( Connection.TRANSACTION_REPEATABLE_READ );
con.setReadOnly( true );
// Check if replicaId is already registered
ps = con.prepareStatement( "SELECT M_VALUE FROM " + metadataTableName + " WHERE M_KEY=?" );
ps.setString( 1, KEY_REPLICA_ID );
rs = ps.executeQuery();
if ( rs.next() )
{
// If already registered, match it with what user specified.
String actualReplicaId = rs.getString( 1 );
if ( !replicaId.getId().equalsIgnoreCase( actualReplicaId ) )
{
throw new ReplicationStoreException( "Replica ID mismatches: " + actualReplicaId + " (expected: "
+ replicaId + ")" );
}
}
else
{
rs.close();
rs = null;
ps.close();
ps = null;
con.setReadOnly( false );
// If not registered yet, register with what user specified.
ps = con.prepareStatement( "INSERT INTO " + metadataTableName + " (M_KEY, M_VALUE) VALUES (?,?)" );
ps.setString( 1, KEY_REPLICA_ID );
ps.setString( 2, replicaId.getId() );
ps.executeUpdate();
}
if ( rs != null )
{
rs.close();
rs = null;
}
ps.close();
ps = null;
// Get known replica IDs.
ps = con.prepareStatement( "SELECT DISTINCT CSN_REPLICA_ID FROM " + logTableName );
rs = ps.executeQuery();
knownReplicaIds = new HashSet<ReplicaId>();
while ( rs.next() )
{
knownReplicaIds.add( new ReplicaId( rs.getString( 1 ) ) );
}
}
catch ( Exception e )
{
if ( e instanceof ReplicationStoreException )
{
throw ( ReplicationStoreException ) e;
}
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, rs );
}
}
public void close()
{
try
{
dataSource.close();
}
catch ( SQLException e )
{
LOG.warn( "Failed to close the dataSource.", e );
}
dataSource = null;
replicaId = null;
try
{
DriverManager.getConnection( dbURI + ";shutdown=true" );
}
catch ( Exception e )
{
// An exception is thrown always.
}
}
public ReplicaId getReplicaId()
{
return replicaId;
}
public Set<ReplicaId> getKnownReplicaIds()
{
return new HashSet<ReplicaId>( knownReplicaIds );
}
public Name getDN( UUID uuid )
{
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try
{
con = dataSource.getConnection();
con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
con.setReadOnly( true );
ps = con.prepareStatement( "SELECT DN FROM " + uuidTableName + " WHERE UUID=?" );
ps.setString( 1, uuid.toOctetString() );
rs = ps.executeQuery();
if ( rs.next() )
{
return new LdapName( rs.getString( 1 ) );
}
else
{
return null;
}
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, rs );
}
}
public boolean putUUID( UUID uuid, Name dn )
{
String uuidString = uuid.toOctetString();
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try
{
con = dataSource.getConnection();
con.setAutoCommit( false );
con.setTransactionIsolation( Connection.TRANSACTION_REPEATABLE_READ );
con.setReadOnly( true );
// Check if the specified uuid already exists
ps = con.prepareStatement( "SELECT UUID FROM " + uuidTableName + " WHERE UUID=?" );
ps.setString( 1, uuidString );
rs = ps.executeQuery();
if ( rs.next() )
{
return false;
}
rs.close();
rs = null;
// insert
con.setReadOnly( false );
ps = con.prepareStatement( "INSERT INTO " + uuidTableName + " (UUID, DN) VALUES(?,?)" );
ps.setString( 1, uuidString );
ps.setString( 2, dn.toString() );
int updateCnt = ps.executeUpdate();
con.commit();
return updateCnt == 1;
}
catch ( Exception e )
{
try
{
con.rollback();
}
catch ( SQLException e1 )
{
LOG.error( "Failed to rollback transaction.", e );
}
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, rs );
}
}
public boolean removeUUID( UUID uuid )
{
String uuidString = uuid.toOctetString();
Connection con = null;
PreparedStatement ps = null;
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
con.setReadOnly( false );
// Check if the specified uuid already exists
ps = con.prepareStatement( "DELETE FROM " + uuidTableName + " WHERE UUID=?" );
ps.setString( 1, uuidString );
return ps.executeUpdate() == 1;
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, null );
}
}
public void putLog( Operation op )
{
CSN csn = op.getCSN();
byte[] encodedOp = operationCodec.encode( op );
Connection con = null;
PreparedStatement ps = null;
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
con.setReadOnly( false );
// Check if the specified uuid already exists
ps = con.prepareStatement( "INSERT INTO " + logTableName
+ " (CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION) VALUES(?,?,?,?)" );
ps.setString( 1, csn.getReplicaId().getId() );
ps.setLong( 2, csn.getTimestamp() );
ps.setInt( 3, csn.getOperationSequence() );
ps.setBytes( 4, encodedOp );
if ( ps.executeUpdate() != 1 )
{
throw new ReplicationStoreException( "Failed to insert a row." );
}
}
catch ( Exception e )
{
if ( e instanceof ReplicationStoreException )
{
throw ( ReplicationStoreException ) e;
}
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, null );
}
if ( !knownReplicaIds.contains( csn.getReplicaId() ) )
{
synchronized ( knownReplicaIdsLock )
{
Set<ReplicaId> newKnownReplicaIds = new HashSet<ReplicaId>( knownReplicaIds );
newKnownReplicaIds.add( csn.getReplicaId() );
knownReplicaIds = newKnownReplicaIds;
}
}
}
public ReplicationLogIterator getLogs( CSNVector updateVector, boolean inclusive )
{
Connection con;
PreparedStatement ps;
ResultSet rs;
updateVector = getNormalizedUpdateVector( updateVector );
StringBuffer buf = new StringBuffer( "SELECT CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION FROM "
+ logTableName + " " );
if ( updateVector.size() > 0 )
{
buf.append( "WHERE " );
for ( int i = updateVector.size();; )
{
buf.append( "( CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ >" + ( inclusive ? "=" : "" )
+ " ? OR CSN_TIMESTAMP > ?) ) " );
i--;
if ( i == 0 )
{
break;
}
else
{
buf.append( "OR " );
}
}
}
buf.append( "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
String query = buf.toString();
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
con.setReadOnly( true );
// Check if the specified uuid already exists
ps = con.prepareStatement( query );
Iterator<ReplicaId> i = updateVector.getReplicaIds().iterator();
int paramIdx = 1;
while ( i.hasNext() )
{
ReplicaId replicaId = i.next();
CSN csn = updateVector.getCSN( replicaId );
ps.setString( paramIdx++, replicaId.getId() );
ps.setLong( paramIdx++, csn.getTimestamp() );
ps.setInt( paramIdx++, csn.getOperationSequence() );
ps.setLong( paramIdx++, csn.getTimestamp() );
}
rs = ps.executeQuery();
return new DerbyReplicationLogIterator( operationCodec, con, ps, rs );
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
}
private CSNVector getNormalizedUpdateVector( CSNVector updateVector )
{
CSNVector newUV = new CSNVector();
synchronized ( knownReplicaIds )
{
for ( ReplicaId knownReplicaId : knownReplicaIds )
{
newUV.setCSN( new DefaultCSN( 0, knownReplicaId, 0 ) );
}
}
newUV.setAllCSN( updateVector );
return newUV;
}
public ReplicationLogIterator getLogs( CSN fromCSN, boolean inclusive )
{
Connection con;
PreparedStatement ps;
ResultSet rs;
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
con.setReadOnly( true );
// Check if the specified uuid already exists
ps = con
.prepareStatement( "SELECT CSN_REPLICA_ID, CSN_TIMESTAMP, CSN_OP_SEQ, OPERATION FROM " + logTableName
+ " " + "WHERE CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ >"
+ ( inclusive ? "=" : "" ) + " ? OR CSN_TIMESTAMP > ?) "
+ "ORDER BY CSN_TIMESTAMP ASC, CSN_OP_SEQ ASC" );
ps.setString( 1, fromCSN.getReplicaId().getId() );
ps.setLong( 2, fromCSN.getTimestamp() );
ps.setInt( 3, fromCSN.getOperationSequence() );
ps.setLong( 4, fromCSN.getTimestamp() );
rs = ps.executeQuery();
return new DerbyReplicationLogIterator( operationCodec, con, ps, rs );
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
}
public int removeLogs( CSN toCSN, boolean inclusive )
{
Connection con = null;
PreparedStatement ps = null;
try
{
con = dataSource.getConnection();
con.setAutoCommit( true );
con.setTransactionIsolation( Connection.TRANSACTION_READ_UNCOMMITTED );
con.setReadOnly( false );
// Check if the specified uuid already exists
ps = con.prepareStatement( "DELETE FROM " + logTableName + " WHERE "
+ "CSN_REPLICA_ID = ? AND (CSN_TIMESTAMP = ? AND CSN_OP_SEQ <" + ( inclusive ? "=" : "" )
+ " ? OR CSN_TIMESTAMP < ?)" );
ps.setString( 1, toCSN.getReplicaId().getId() );
ps.setLong( 2, toCSN.getTimestamp() );
ps.setInt( 3, toCSN.getOperationSequence() );
ps.setLong( 4, toCSN.getTimestamp() );
return ps.executeUpdate();
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, null );
}
}
public int getLogSize()
{
Connection con = null;
Statement stmt = null;
ResultSet rs = null;
try
{
con = dataSource.getConnection();
con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
con.setReadOnly( true );
stmt = con.createStatement();
rs = stmt.executeQuery( "SELECT COUNT(*) FROM " + logTableName );
rs.next();
return rs.getInt( 1 );
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, stmt, rs );
}
}
public int getLogSize( ReplicaId replicaId )
{
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try
{
con = dataSource.getConnection();
con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
con.setReadOnly( true );
ps = con.prepareStatement( "SELECT COUNT(*) FROM " + logTableName + " WHERE CSN_REPLICA_ID=?" );
ps.setString( 1, replicaId.getId() );
rs = ps.executeQuery();
rs.next();
return rs.getInt( 1 );
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, rs );
}
}
public CSNVector getUpdateVector()
{
return getVector( false );
}
public CSNVector getPurgeVector()
{
return getVector( true );
}
private CSNVector getVector( boolean min )
{
final String ORDER = min ? "ASC" : "DESC";
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
CSNVector result = new CSNVector();
try
{
con = dataSource.getConnection();
con.setTransactionIsolation( Connection.TRANSACTION_READ_COMMITTED );
con.setReadOnly( true );
ps = con.prepareStatement( "SELECT CSN_TIMESTAMP, CSN_OP_SEQ FROM " + logTableName
+ " WHERE CSN_REPLICA_ID=? ORDER BY CSN_TIMESTAMP " + ORDER + ", CSN_OP_SEQ " + ORDER );
Iterator<ReplicaId> it = knownReplicaIds.iterator();
while ( it.hasNext() )
{
ReplicaId replicaId = it.next();
ps.setString( 1, replicaId.getId() );
rs = ps.executeQuery();
if ( rs.next() )
{
result.setCSN( new DefaultCSN( rs.getLong( 1 ), replicaId, rs.getInt( 2 ) ) );
}
rs.close();
rs = null;
ps.clearParameters();
}
return result;
}
catch ( Exception e )
{
throw new ReplicationStoreException( e );
}
finally
{
SQLUtil.cleanup( con, ps, rs );
}
}
}