blob: b47300081f947c8b247a2c44f8154e352546161d [file] [log] [blame]
/*
* Copyright 2011 Paul Merlin.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zest.entitystore.mongodb;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.util.JSON;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.entity.EntityDescriptor;
import org.apache.zest.api.entity.EntityReference;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.service.ServiceActivation;
import org.apache.zest.io.Input;
import org.apache.zest.io.Output;
import org.apache.zest.io.Receiver;
import org.apache.zest.io.Sender;
import org.apache.zest.spi.entitystore.EntityNotFoundException;
import org.apache.zest.spi.entitystore.EntityStoreException;
import org.apache.zest.spi.entitystore.helpers.MapEntityStore;
/**
* MongoDB implementation of MapEntityStore.
*/
public class MongoMapEntityStoreMixin
implements ServiceActivation, MapEntityStore, MongoAccessors
{
private static final String DEFAULT_DATABASE_NAME = "qi4j:entitystore";
private static final String DEFAULT_COLLECTION_NAME = "qi4j:entitystore:entities";
public static final String IDENTITY_COLUMN = "_id";
public static final String STATE_COLUMN = "state";
@This
private Configuration<MongoEntityStoreConfiguration> configuration;
private List<ServerAddress> serverAddresses;
private String databaseName;
private String collectionName;
private WriteConcern writeConcern;
private String username;
private char[] password;
private MongoClient mongo;
private DB db;
@Override
public void activateService()
throws Exception
{
loadConfiguration();
// Create Mongo driver and open the database
if( username.isEmpty() )
{
mongo = new MongoClient( serverAddresses );
}
else
{
MongoCredential credential = MongoCredential.createMongoCRCredential( username, databaseName, password );
mongo = new MongoClient( serverAddresses, Arrays.asList( credential ) );
}
db = mongo.getDB( databaseName );
// Create index if needed
db.requestStart();
DBCollection entities = db.getCollection( collectionName );
if( entities.getIndexInfo().isEmpty() )
{
entities.createIndex( new BasicDBObject( IDENTITY_COLUMN, 1 ) );
}
db.requestDone();
}
private void loadConfiguration()
throws UnknownHostException
{
configuration.refresh();
MongoEntityStoreConfiguration config = configuration.get();
// Combine hostname, port and nodes configuration properties
// If no configuration, use 127.0.0.1:27017
serverAddresses = new ArrayList<>();
int port = config.port().get() == null ? 27017 : config.port().get();
if( config.nodes().get().isEmpty() )
{
String hostname = config.hostname().get() == null ? "127.0.0.1" : config.hostname().get();
serverAddresses.add( new ServerAddress( hostname, port ) );
}
else
{
if( config.hostname().get() != null && !config.hostname().get().isEmpty() )
{
serverAddresses.add( new ServerAddress( config.hostname().get(), port ) );
}
serverAddresses.addAll( config.nodes().get() );
}
// If database name not configured, set it to qi4j:entitystore
databaseName = config.database().get();
if( databaseName == null )
{
databaseName = DEFAULT_DATABASE_NAME;
}
// If collection name not configured, set it to qi4j:entitystore:entities
collectionName = config.collection().get();
if( collectionName == null )
{
collectionName = DEFAULT_COLLECTION_NAME;
}
// If write concern not configured, set it to normal
switch( config.writeConcern().get() )
{
case FSYNC_SAFE:
writeConcern = WriteConcern.FSYNC_SAFE;
break;
case JOURNAL_SAFE:
writeConcern = WriteConcern.JOURNAL_SAFE;
break;
case MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
case REPLICAS_SAFE:
writeConcern = WriteConcern.REPLICAS_SAFE;
break;
case SAFE:
writeConcern = WriteConcern.SAFE;
break;
case NORMAL:
default:
writeConcern = WriteConcern.NORMAL;
}
// Username and password are defaulted to empty strings
username = config.username().get();
password = config.password().get().toCharArray();
}
@Override
public void passivateService()
throws Exception
{
mongo.close();
mongo = null;
databaseName = null;
collectionName = null;
writeConcern = null;
username = null;
Arrays.fill( password, ' ' );
password = null;
db = null;
}
@Override
public MongoClient mongoInstanceUsed()
{
return mongo;
}
@Override
public DB dbInstanceUsed()
{
return db;
}
@Override
public String collectionUsed()
{
return collectionName;
}
@Override
public Reader get( EntityReference entityReference )
throws EntityStoreException
{
db.requestStart();
DBObject entity = db.getCollection( collectionName ).findOne( byIdentity( entityReference ) );
if( entity == null )
{
throw new EntityNotFoundException( entityReference );
}
DBObject bsonState = (DBObject) entity.get( STATE_COLUMN );
db.requestDone();
String jsonState = JSON.serialize( bsonState );
return new StringReader( jsonState );
}
@Override
public void applyChanges( MapChanges changes )
throws IOException
{
db.requestStart();
final DBCollection entities = db.getCollection( collectionName );
changes.visitMap( new MapChanger()
{
@Override
public Writer newEntity( final EntityReference ref, EntityDescriptor entityDescriptor )
throws IOException
{
return new StringWriter( 1000 )
{
@Override
public void close()
throws IOException
{
super.close();
String jsonState = toString();
DBObject bsonState = (DBObject) JSON.parse( jsonState );
BasicDBObject entity = new BasicDBObject();
entity.put( IDENTITY_COLUMN, ref.identity() );
entity.put( STATE_COLUMN, bsonState );
entities.insert( entity, writeConcern );
}
};
}
@Override
public Writer updateEntity( final EntityReference ref, EntityDescriptor entityDescriptor )
throws IOException
{
return new StringWriter( 1000 )
{
@Override
public void close()
throws IOException
{
super.close();
DBObject bsonState = (DBObject) JSON.parse( toString() );
BasicDBObject entity = new BasicDBObject();
entity.put( IDENTITY_COLUMN, ref.identity() );
entity.put( STATE_COLUMN, bsonState );
entities.update( byIdentity( ref ), entity, false, false, writeConcern );
}
};
}
@Override
public void removeEntity( EntityReference ref, EntityDescriptor entityDescriptor )
throws EntityNotFoundException
{
DBObject entity = entities.findOne( byIdentity( ref ) );
if( entity == null )
{
throw new EntityNotFoundException( ref );
}
entities.remove( entity, writeConcern );
}
} );
db.requestDone();
}
@Override
public Input<Reader, IOException> entityStates()
{
return new Input<Reader, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void transferTo(
Output<? super Reader, ReceiverThrowableType> output )
throws IOException, ReceiverThrowableType
{
output.receiveFrom( new Sender<Reader, IOException>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo(
Receiver<? super Reader, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, IOException
{
db.requestStart();
DBCursor cursor = db.getCollection( collectionName ).find();
while( cursor.hasNext() )
{
DBObject eachEntity = cursor.next();
DBObject bsonState = (DBObject) eachEntity.get( STATE_COLUMN );
String jsonState = JSON.serialize( bsonState );
receiver.receive( new StringReader( jsonState ) );
}
db.requestDone();
}
} );
}
};
}
private DBObject byIdentity( EntityReference entityReference )
{
return new BasicDBObject( IDENTITY_COLUMN, entityReference.identity() );
}
}