| package org.qi4j.entitystore.cassandra; |
| |
| import org.apache.cassandra.thrift.*; |
| import org.apache.commons.lang.NotImplementedException; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.apache.thrift.transport.TSocket; |
| import org.apache.thrift.transport.TTransport; |
| import org.qi4j.api.entity.EntityReference; |
| import org.qi4j.api.injection.scope.Service; |
| import org.qi4j.api.io.Input; |
| import org.qi4j.api.service.Activatable; |
| import org.qi4j.entitystore.map.MapEntityStore; |
| import org.qi4j.spi.entity.EntityType; |
| import org.qi4j.spi.entitystore.EntityAlreadyExistsException; |
| import org.qi4j.spi.entitystore.EntityNotFoundException; |
| import org.qi4j.spi.entitystore.EntityStoreException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.*; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.zip.GZIPInputStream; |
| import java.util.zip.GZIPOutputStream; |
| |
| /** |
| * // TODO: Document this |
| * |
| * @author pvdyck |
| * @since 4.0 |
| */ |
| public class CassandraMapEntityStoreMixin implements MapEntityStore, Activatable { |
| private final Logger logger = LoggerFactory.getLogger(CassandraMapEntityStoreMixin.class); |
| |
| private static final String keySpace = "Qi4j"; |
| static final String entryColumnFamily = "Qi4jEntries"; |
| |
| private ColumnPath entryColumnPath; |
| |
| |
| static final int BYTE_ARRAY_BUFFER_INITIAL_SIZE = 512; |
| |
| private TTransport tr = new TSocket("localhost", 9160); |
| private Cassandra.Client client = new Cassandra.Client(new TBinaryProtocol(tr)); |
| |
| @Service |
| private |
| CassandraConfiguration conf; |
| |
| public void activate() throws Exception { |
| logger.info("starting cassandra store"); |
| tr = new TSocket(conf.getHost(), 9160); |
| if(conf.getLogin()!=null){ |
| |
| } |
| client = new Cassandra.Client(new TBinaryProtocol(tr)); |
| tr.open(); |
| entryColumnPath = new ColumnPath(entryColumnFamily).setColumn("entry".getBytes("UTF-8")); |
| logger.info("started cassandra store"); |
| } |
| |
| |
| public void passivate() throws Exception { |
| logger.info("shutting down cassandra"); |
| synchronized (client) { |
| tr.close(); |
| } |
| } |
| |
| |
| public void applyChanges(final MapChanges changes) throws IOException { |
| if (conf.readOnly()) { |
| throw new EntityStoreException("Read-only Entity Store"); |
| } |
| |
| try { |
| final MapUpdater changer = new MapUpdater(); |
| changes.visitMap(changer); |
| synchronized (client) { |
| client.batch_mutate(keySpace, changer.mutationMap, ConsistencyLevel.ONE); |
| } |
| } catch (Throwable e) { |
| throw new EntityStoreException("Exception during cassandra batch " |
| + " - ", e); |
| } |
| } |
| |
| boolean contains(EntityReference ref) throws EntityStoreException { |
| try { |
| return get(ref) != null; |
| } catch (final EntityNotFoundException e1) { |
| return false; |
| } catch (final Exception e1) { |
| throw new EntityStoreException(e1); |
| } |
| } |
| |
| public Reader get(final EntityReference ref) { |
| String hashKey = ref.toString(); |
| |
| synchronized (client) { |
| try { |
| ColumnOrSuperColumn column = client.get(keySpace, hashKey, |
| entryColumnPath, ConsistencyLevel.ONE); |
| |
| return createReader(new ByteArrayInputStream(column.getColumn().getValue())); |
| } catch (NotFoundException nfe) { |
| throw new EntityNotFoundException(ref); |
| } catch (Exception e) { |
| throw new EntityStoreException(e); |
| } |
| } |
| } |
| |
| public Input<Reader, IOException> entityStates() { |
| throw new NotImplementedException(); |
| } |
| |
| |
| Writer createWriter(OutputStream out) throws IOException { |
| if (conf.gzipCompress()) |
| return new OutputStreamWriter(new GZIPOutputStream(out)); |
| return new OutputStreamWriter(out); |
| } |
| |
| private Reader createReader(InputStream in) throws IOException { |
| if (conf.gzipCompress()) |
| return new InputStreamReader(new GZIPInputStream(in)); |
| return new InputStreamReader(in); |
| } |
| |
| void checkAbsentBeforeCreate(EntityReference ref) { |
| if (!conf.checkAbsentBeforeCreate()) |
| return; |
| if (contains(ref)) |
| throw new EntityAlreadyExistsException(ref); |
| } |
| |
| void checkPresentBeforeDelete(EntityReference ref) { |
| if (!conf.checkPresentBeforeDelete()) |
| return; |
| if (!contains(ref)) |
| throw new EntityNotFoundException(ref); |
| } |
| |
| void checkPresentBeforeUpdate(EntityReference ref) { |
| if (!conf.checkPresentBeforeUpdate()) |
| return; |
| if (!contains(ref)) |
| throw new EntityNotFoundException(ref); |
| } |
| |
| public ColumnPath getEntryColumnPath() { |
| return entryColumnPath; |
| } |
| |
| class MapUpdater implements MapEntityStore.MapChanger { |
| |
| private final Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(); |
| |
| public Writer newEntity(final EntityReference ref, EntityType entityType) { |
| checkAbsentBeforeCreate(ref); |
| return getWriter(ref); |
| } |
| |
| public Writer updateEntity(final EntityReference ref, EntityType entityType) |
| throws IOException { |
| checkPresentBeforeUpdate(ref); |
| return getWriter(ref); |
| } |
| |
| public void removeEntity(EntityReference ref, EntityType entityType) |
| throws EntityNotFoundException { |
| checkPresentBeforeDelete(ref); |
| createMutationHolder(ref.identity()).add(new Mutation().setDeletion(new Deletion(System.currentTimeMillis()))); |
| } |
| |
| private Writer getWriter(final EntityReference ref) { |
| try { |
| return createWriter(new ByteArrayOutputStream(CassandraMapEntityStoreMixin.BYTE_ARRAY_BUFFER_INITIAL_SIZE) { |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); |
| cosc.setColumn(new Column(getEntryColumnPath().getColumn(), toByteArray(), System.currentTimeMillis())); |
| createMutationHolder(ref.identity()).add(new Mutation().setColumn_or_supercolumn(cosc)); |
| } |
| }); |
| } catch (final Exception e) { |
| throw new EntityStoreException(e); |
| } |
| } |
| |
| private List<Mutation> createMutationHolder(String key) { |
| Map<String, List<Mutation>> keyMutations = mutationMap.get(key); |
| |
| if (keyMutations == null) { |
| keyMutations = new HashMap<String, List<Mutation>>(); |
| mutationMap.put(key, keyMutations); |
| } |
| |
| List<Mutation> columnFamilyMutations = keyMutations.get(CassandraMapEntityStoreMixin.entryColumnFamily); |
| if (columnFamilyMutations == null) { |
| columnFamilyMutations = new ArrayList<Mutation>(); |
| keyMutations.put(CassandraMapEntityStoreMixin.entryColumnFamily, columnFamilyMutations); |
| } |
| return columnFamilyMutations; |
| } |
| } |
| } |