blob: ac6c31e757a7288574d6eca3d5f92e60aec601af [file] [log] [blame]
/* Copyright 2008 Rickard Öberg.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zest.entitystore.jdbm;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import jdbm.RecordManager;
import jdbm.RecordManagerFactory;
import jdbm.RecordManagerOptions;
import jdbm.Serializer;
import jdbm.btree.BTree;
import jdbm.helper.ByteArrayComparator;
import jdbm.helper.DefaultSerializer;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import jdbm.recman.CacheRecordManager;
import org.apache.zest.api.common.Optional;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.Service;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.injection.scope.Uses;
import org.apache.zest.api.service.ServiceDescriptor;
import org.apache.zest.io.Files;
import org.apache.zest.io.Input;
import org.apache.zest.io.Output;
import org.apache.zest.io.Receiver;
import org.apache.zest.io.Sender;
import org.apache.zest.library.fileconfig.FileConfiguration;
import org.apache.zest.library.locking.ReadLock;
import org.apache.zest.library.locking.WriteLock;
import org.apache.zest.spi.entitystore.BackupRestore;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
/**
* JDBM implementation of MapEntityStore.
*/
public class JdbmEntityStoreMixin
implements JdbmEntityStoreActivation, MapEntityStore, BackupRestore
{
@Optional
@Service
FileConfiguration fileConfiguration;
@This
private Configuration<JdbmConfiguration> config;
@Uses
private ServiceDescriptor descriptor;
private RecordManager recordManager;
private BTree index;
private Serializer serializer;
@This
ReadWriteLock lock;
@Override
public void setUpJdbm()
throws Exception
{
initialize();
}
@Override
public void tearDownJdbm()
throws Exception
{
recordManager.close();
}
@ReadLock
@Override
public Reader get( EntityReference entityReference )
throws EntityStoreException
{
try
{
Long stateIndex = getStateIndex( entityReference.identity() );
if( stateIndex == null )
{
throw new EntityNotFoundException( entityReference );
}
byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
if( serializedState == null )
{
throw new EntityNotFoundException( entityReference );
}
return new StringReader( new String( serializedState, "UTF-8" ) );
}
catch( IOException e )
{
throw new EntityStoreException( e );
}
}
@WriteLock
@Override
public void applyChanges( MapChanges changes )
throws IOException
{
try
{
changes.visitMap( new MapChanger()
{
@Override
public Writer newEntity( final EntityReference ref, EntityDescriptor descriptor )
throws IOException
{
return new StringWriter( 1000 )
{
@Override
public void close()
throws IOException
{
super.close();
byte[] stateArray = toString().getBytes( "UTF-8" );
long stateIndex = recordManager.insert( stateArray, serializer );
String indexKey = ref.toString();
index.insert( indexKey.getBytes( "UTF-8" ), stateIndex, false );
}
};
}
@Override
public Writer updateEntity( final EntityReference ref, EntityDescriptor descriptor )
throws IOException
{
return new StringWriter( 1000 )
{
@Override
public void close()
throws IOException
{
super.close();
Long stateIndex = getStateIndex( ref.toString() );
byte[] stateArray = toString().getBytes( "UTF-8" );
recordManager.update( stateIndex, stateArray, serializer );
}
};
}
@Override
public void removeEntity( EntityReference ref, EntityDescriptor descriptor )
throws EntityNotFoundException
{
try
{
Long stateIndex = getStateIndex( ref.toString() );
recordManager.delete( stateIndex );
index.remove( ref.toString().getBytes( "UTF-8" ) );
}
catch( IOException e )
{
throw new EntityStoreException( e );
}
}
} );
recordManager.commit();
}
catch( Exception e )
{
e.printStackTrace();
recordManager.rollback();
if( e instanceof IOException )
{
throw (IOException) e;
}
else if( e instanceof EntityStoreException )
{
throw (EntityStoreException) e;
}
else
{
throw new IOException( e );
}
}
}
@Override
public Input<Reader, IOException> entityStates()
{
return new Input<Reader, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super Reader, ReceiverThrowableType> output )
throws IOException, ReceiverThrowableType
{
lock.writeLock().lock();
try
{
output.receiveFrom( new Sender<Reader, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super Reader, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, IOException
{
final TupleBrowser browser = index.browse();
final Tuple tuple = new Tuple();
while( browser.getNext( tuple ) )
{
String id = new String( (byte[]) tuple.getKey(), "UTF-8" );
Long stateIndex = getStateIndex( id );
if( stateIndex == null )
{
continue;
} // Skip this one
byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
receiver.receive( new StringReader( new String( serializedState, "UTF-8" ) ) );
}
}
} );
}
finally
{
lock.writeLock().unlock();
}
}
};
}
@Override
public Input<String, IOException> backup()
{
return new Input<String, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output )
throws IOException, ReceiverThrowableType
{
lock.readLock().lock();
try
{
output.receiveFrom( new Sender<String, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super String, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, IOException
{
final TupleBrowser browser = index.browse();
final Tuple tuple = new Tuple();
while( browser.getNext( tuple ) )
{
String id = new String( (byte[]) tuple.getKey(), "UTF-8" );
Long stateIndex = getStateIndex( id );
if( stateIndex == null )
{
continue;
} // Skip this one
byte[] serializedState = (byte[]) recordManager.fetch( stateIndex, serializer );
receiver.receive( new String( serializedState, "UTF-8" ) );
}
}
} );
}
finally
{
lock.readLock().unlock();
}
}
};
}
@Override
public Output<String, IOException> restore()
{
return new Output<String, IOException>()
{
@Override
public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends String, SenderThrowableType> sender )
throws IOException, SenderThrowableType
{
File dbFile = new File( getDatabaseName() + ".db" );
File lgFile = new File( getDatabaseName() + ".lg" );
// Create temporary store
File tempDatabase = Files.createTemporayFileOf( dbFile );
final RecordManager recordManager = RecordManagerFactory.createRecordManager( tempDatabase.getAbsolutePath(), new Properties() );
ByteArrayComparator comparator = new ByteArrayComparator();
final BTree index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
recordManager.setNamedObject( "index", index.getRecid() );
recordManager.commit();
try
{
sender.sendTo( new Receiver<String, IOException>()
{
int counter = 0;
@Override
public void receive( String item )
throws IOException
{
// Commit one batch
if( ( counter++ % 1000 ) == 0 )
{
recordManager.commit();
}
String id = item.substring( "{\"identity\":\"".length() );
id = id.substring( 0, id.indexOf( '"' ) );
// Insert
byte[] stateArray = item.getBytes( "UTF-8" );
long stateIndex = recordManager.insert( stateArray, serializer );
index.insert( id.getBytes( "UTF-8" ), stateIndex, false );
}
} );
}
catch( IOException e )
{
recordManager.close();
tempDatabase.delete();
throw e;
}
catch( Throwable senderThrowableType )
{
recordManager.close();
tempDatabase.delete();
throw (SenderThrowableType) senderThrowableType;
}
// Import went ok - continue
recordManager.commit();
// close file handles otherwise Microsoft Windows will fail to rename database files.
recordManager.close();
lock.writeLock().lock();
try
{
// Replace old database with new
JdbmEntityStoreMixin.this.recordManager.close();
boolean deletedOldDatabase = true;
deletedOldDatabase &= dbFile.delete();
deletedOldDatabase &= lgFile.delete();
if( !deletedOldDatabase )
{
throw new IOException( "Could not remove old database" );
}
boolean renamedTempDatabase = true;
renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".db" ).renameTo( dbFile );
renamedTempDatabase &= new File( tempDatabase.getAbsolutePath() + ".lg" ).renameTo( lgFile );
if( !renamedTempDatabase )
{
throw new IOException( "Could not replace database with temp database" );
}
// Start up again
initialize();
}
finally
{
lock.writeLock().unlock();
}
}
};
}
private String getDatabaseName()
{
String pathname = config.get().file().get();
if( pathname == null )
{
if( fileConfiguration != null )
{
File dataDir = fileConfiguration.dataDirectory();
File jdbmDir = new File( dataDir, descriptor.identity() + "/jdbm.data" );
pathname = jdbmDir.getAbsolutePath();
}
else
{
pathname = System.getProperty( "user.dir" ) + "/qi4j/jdbm.data";
}
}
File dataFile = new File( pathname );
File directory = dataFile.getAbsoluteFile().getParentFile();
directory.mkdirs();
String name = dataFile.getAbsolutePath();
return name;
}
private Properties getProperties()
{
JdbmConfiguration config = this.config.get();
Properties properties = new Properties();
properties.put( RecordManagerOptions.AUTO_COMMIT, config.autoCommit().get().toString() );
properties.put( RecordManagerOptions.DISABLE_TRANSACTIONS, config.disableTransactions().get().toString() );
return properties;
}
private Long getStateIndex( String identity )
throws IOException
{
return (Long) index.find( identity.getBytes( "UTF-8" ) );
}
private void initialize()
throws IOException
{
String name = getDatabaseName();
Properties properties = getProperties();
recordManager = RecordManagerFactory.createRecordManager( name, properties );
serializer = DefaultSerializer.INSTANCE;
recordManager = new CacheRecordManager( recordManager, 1000, false );
long recid = recordManager.getNamedObject( "index" );
if( recid != 0 )
{
index = BTree.load( recordManager, recid );
}
else
{
ByteArrayComparator comparator = new ByteArrayComparator();
index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
recordManager.setNamedObject( "index", index.getRecid() );
}
recordManager.commit();
}
}