blob: 44482105f71f77188777e97bdccd65986eb4e7c4 [file] [log] [blame]
/**
*
* Copyright 2009-2010 Rickard Öberg AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.qi4j.library.eventsourcing.domain.source.jdbm;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import jdbm.RecordManager;
import jdbm.RecordManagerFactory;
import jdbm.RecordManagerOptions;
import jdbm.Serializer;
import jdbm.btree.BTree;
import jdbm.helper.ByteArrayComparator;
import jdbm.helper.DefaultSerializer;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import jdbm.recman.CacheRecordManager;
import org.qi4j.api.activation.Activators;
import org.qi4j.api.injection.scope.Service;
import org.qi4j.api.mixin.Mixins;
import org.qi4j.api.service.ServiceComposite;
import org.qi4j.api.service.qualifier.Tagged;
import org.qi4j.api.value.ValueSerialization;
import org.qi4j.functional.Function;
import org.qi4j.io.Input;
import org.qi4j.io.Output;
import org.qi4j.io.Receiver;
import org.qi4j.io.Sender;
import org.qi4j.io.Transforms;
import org.qi4j.library.eventsourcing.domain.api.UnitOfWorkDomainEventsValue;
import org.qi4j.library.eventsourcing.domain.source.AbstractEventStoreMixin;
import org.qi4j.library.eventsourcing.domain.source.EventManagement;
import org.qi4j.library.eventsourcing.domain.source.EventSource;
import org.qi4j.library.eventsourcing.domain.source.EventStore;
import org.qi4j.library.eventsourcing.domain.source.EventStoreActivation;
import org.qi4j.library.eventsourcing.domain.source.EventStream;
import org.qi4j.library.fileconfig.FileConfiguration;
/**
* JAVADOC
*/
@Mixins( JdbmEventStoreService.JdbmEventStoreMixin.class )
@Activators( EventStoreActivation.Activator.class )
public interface JdbmEventStoreService
extends EventSource, EventStore, EventStream, EventManagement, EventStoreActivation, ServiceComposite
{
class JdbmEventStoreMixin
extends AbstractEventStoreMixin
implements EventManagement, EventSource
{
@Service
private FileConfiguration fileConfig;
@Service
@Tagged( ValueSerialization.Formats.JSON )
private ValueSerialization valueSerialization;
private RecordManager recordManager;
private BTree index;
private Serializer serializer;
private File dataFile;
private long currentCount;
@Override
public void activateEventStore()
throws Exception
{
super.activateEventStore();
dataFile = new File( fileConfig.dataDirectory(), identity.identity() + "/events" );
File directory = dataFile.getAbsoluteFile().getParentFile();
directory.mkdirs();
String name = dataFile.getAbsolutePath();
Properties properties = new Properties();
properties.put( RecordManagerOptions.AUTO_COMMIT, "false" );
properties.put( RecordManagerOptions.DISABLE_TRANSACTIONS, "false" );
initialize( name, properties );
}
@Override
public void passivateEventStore()
throws Exception
{
super.passivateEventStore();
recordManager.close();
}
@Override
public Output<String, IOException> restore()
{
// Commit every 1000 events, convert from string to value, and then store. Put a lock around the whole thing
Output<String, IOException> map = Transforms.map( new Transforms.ProgressLog<String>( 1000 )
{
@Override
protected void logProgress()
{
try
{
recordManager.commit(); // Commit every 1000 transactions to avoid OutOfMemory issues
}
catch( IOException e )
{
throw new IllegalStateException( "Could not commit data", e );
}
}
}, Transforms.map( new Function<String, UnitOfWorkDomainEventsValue>()
{
@Override
public UnitOfWorkDomainEventsValue map( String item )
{
return valueSerialization.<UnitOfWorkDomainEventsValue>deserialize( eventsType, item );
}
}, storeEvents0() ) );
return Transforms.lock( JdbmEventStoreMixin.this.lock,
map );
}
// EventStore implementation
@Override
public Input<UnitOfWorkDomainEventsValue, IOException> events( final long offset, long limit )
{
return new Input<UnitOfWorkDomainEventsValue, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super UnitOfWorkDomainEventsValue, ReceiverThrowableType> output )
throws IOException, ReceiverThrowableType
{
output.receiveFrom( new Sender<UnitOfWorkDomainEventsValue, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super UnitOfWorkDomainEventsValue, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, IOException
{
// Lock datastore first
lock();
try
{
final TupleBrowser browser = index.browse( offset + 1 );
Tuple tuple = new Tuple();
while( browser.getNext( tuple ) )
{
// Get next transaction
UnitOfWorkDomainEventsValue domainEvents = readTransactionEvents( tuple );
receiver.receive( domainEvents );
}
}
catch( Exception e )
{
logger.warn( "Could not iterate events", e );
}
finally
{
lock.unlock();
}
}
} );
}
};
}
@Override
public long count()
{
return currentCount;
}
@Override
protected Output<UnitOfWorkDomainEventsValue, IOException> storeEvents0()
{
return new Output<UnitOfWorkDomainEventsValue, IOException>()
{
@Override
public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends UnitOfWorkDomainEventsValue, SenderThrowableType> sender )
throws IOException, SenderThrowableType
{
try
{
sender.sendTo( new Receiver<UnitOfWorkDomainEventsValue, IOException>()
{
@Override
public void receive( UnitOfWorkDomainEventsValue item )
throws IOException
{
String jsonString = valueSerialization.serialize( item );
currentCount++;
index.insert( currentCount, jsonString.getBytes( "UTF-8" ), false );
}
} );
recordManager.commit();
}
catch( IOException e )
{
recordManager.rollback();
throw e;
}
catch( Throwable e )
{
recordManager.rollback();
throw (SenderThrowableType) e;
}
}
};
}
private void initialize( String name, Properties properties )
throws IOException
{
recordManager = RecordManagerFactory.createRecordManager( name, properties );
serializer = DefaultSerializer.INSTANCE;
recordManager = new CacheRecordManager( recordManager, 1000, false );
long recid = recordManager.getNamedObject( "index" );
if( recid != 0 )
{
index = BTree.load( recordManager, recid );
currentCount = index.size();
}
else
{
ByteArrayComparator comparator = new ByteArrayComparator();
index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
recordManager.setNamedObject( "index", index.getRecid() );
currentCount = 0;
}
recordManager.commit();
}
private UnitOfWorkDomainEventsValue readTransactionEvents( Tuple tuple )
throws UnsupportedEncodingException
{
byte[] eventData = (byte[]) tuple.getValue();
String eventJson = new String( eventData, "UTF-8" );
return valueSerialization.<UnitOfWorkDomainEventsValue>deserialize( eventsType, eventJson );
}
}
}