blob: b4b1bf4351133c3a14b6db594d9555a67387a46c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.cassandra;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.MergeIterator;
import org.apache.hugegraph.backend.store.AbstractBackendStore;
import org.apache.hugegraph.backend.store.BackendAction;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.exception.ConnectionException;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
public abstract class CassandraStore extends AbstractBackendStore<CassandraSessionPool.Session> {
private static final Logger LOG = Log.logger(CassandraStore.class);
private static final BackendFeatures FEATURES = new CassandraFeatures();
private final String store;
private final String keyspace;
private final BackendStoreProvider provider;
// TODO: move to parent class
private final Map<String, CassandraTable> tables;
private CassandraSessionPool sessions;
private HugeConfig conf;
private boolean isGraphStore;
public CassandraStore(final BackendStoreProvider provider,
final String keyspace, final String store) {
E.checkNotNull(keyspace, "keyspace");
E.checkNotNull(store, "store");
this.provider = provider;
this.keyspace = keyspace;
this.store = store;
this.tables = new ConcurrentHashMap<>();
this.sessions = null;
this.conf = null;
this.registerMetaHandlers();
LOG.debug("Store loaded: {}", store);
}
private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
CassandraMetrics metrics = this.createMetrics(this.conf,
this.sessions,
this.keyspace);
return metrics.metrics();
});
this.registerMetaHandler("compact", (session, meta, args) -> {
CassandraMetrics metrics = this.createMetrics(this.conf,
this.sessions,
this.keyspace);
return metrics.compact();
});
}
protected CassandraMetrics createMetrics(HugeConfig conf,
CassandraSessionPool sessions,
String keyspace) {
return new CassandraMetrics(conf, sessions, keyspace);
}
protected void registerTableManager(HugeType type, CassandraTable table) {
this.registerTableManager(type.string(), table);
}
protected void registerTableManager(String name, CassandraTable table) {
this.tables.put(name, table);
}
protected void unregisterTableManager(String name) {
this.tables.remove(name);
}
@Override
public String store() {
return this.store;
}
@Override
public String database() {
return this.keyspace;
}
@Override
public BackendStoreProvider provider() {
return this.provider;
}
@Override
public synchronized void open(HugeConfig config) {
LOG.debug("Store open: {}", this.store);
E.checkNotNull(config, "config");
if (this.sessions == null) {
this.sessions = new CassandraSessionPool(config, this.keyspace,
this.store);
}
assert this.sessions != null;
if (!this.sessions.closed()) {
// TODO: maybe we should throw an exception here instead of ignore
LOG.debug("Store {} has been opened before", this.store);
this.sessions.useSession();
return;
}
this.conf = config;
String graphStore = this.conf.get(CoreOptions.STORE_GRAPH);
this.isGraphStore = this.store.equals(graphStore);
// Init cluster
this.sessions.open();
// Init a session for current thread
try {
LOG.debug("Store connect with keyspace: {}", this.keyspace);
try {
this.sessions.session().open();
} catch (InvalidQueryException e) {
// TODO: the error message may be changed in different versions
if (!e.getMessage().contains(String.format(
"Keyspace '%s' does not exist", this.keyspace))) {
throw e;
}
if (this.isSchemaStore()) {
LOG.info("Failed to connect keyspace: {}, " +
"try to init keyspace later", this.keyspace);
}
}
} catch (Throwable e) {
try {
this.sessions.close();
} catch (Throwable e2) {
LOG.warn("Failed to close cluster after an error", e2);
}
throw new ConnectionException("Failed to connect to Cassandra", e);
}
LOG.debug("Store opened: {}", this.store);
}
@Override
public void close() {
LOG.debug("Store close: {}", this.store);
this.sessions.close();
}
@Override
public boolean opened() {
this.checkClusterConnected();
return this.sessions.session().opened();
}
@Override
public void mutate(BackendMutation mutation) {
if (LOG.isDebugEnabled()) {
LOG.debug("Store {} mutation: {}", this.store, mutation);
}
this.checkOpened();
CassandraSessionPool.Session session = this.sessions.session();
for (Iterator<BackendAction> it = mutation.mutation(); it.hasNext();) {
this.mutate(session, it.next());
}
}
private void mutate(CassandraSessionPool.Session session,
BackendAction item) {
CassandraBackendEntry entry = castBackendEntry(item.entry());
// Check if the entry has no change
if (!entry.selfChanged() && entry.subRows().isEmpty()) {
LOG.warn("The entry will be ignored due to no change: {}", entry);
}
CassandraTable table;
if (!entry.olap()) {
// Oltp table
table = this.table(entry.type());
} else {
if (entry.type().isIndex()) {
// Olap index
table = this.table(this.olapTableName(entry.type()));
} else {
// Olap vertex
table = this.table(this.olapTableName(entry.subId()));
}
}
switch (item.action()) {
case INSERT:
// Insert entry
if (entry.selfChanged()) {
table.insert(session, entry.row());
}
// Insert sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).insert(session, row);
}
break;
case DELETE:
// Delete entry
if (entry.selfChanged()) {
table.delete(session, entry.row());
}
// Delete sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).delete(session, row);
}
break;
case APPEND:
// Append entry
if (entry.selfChanged()) {
table.append(session, entry.row());
}
// Append sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).append(session, row);
}
break;
case ELIMINATE:
// Eliminate entry
if (entry.selfChanged()) {
table.eliminate(session, entry.row());
}
// Eliminate sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).eliminate(session, row);
}
break;
case UPDATE_IF_PRESENT:
if (entry.selfChanged()) {
// TODO: forward to master-writer node
table.updateIfPresent(session, entry.row());
}
assert entry.subRows().isEmpty() : entry.subRows();
break;
case UPDATE_IF_ABSENT:
if (entry.selfChanged()) {
// TODO: forward to master-writer node
table.updateIfAbsent(session, entry.row());
}
assert entry.subRows().isEmpty() : entry.subRows();
break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
}
}
@Override
public Iterator<BackendEntry> query(Query query) {
this.checkOpened();
HugeType type = CassandraTable.tableType(query);
String tableName = query.olap() ? this.olapTableName(type) :
type.string();
CassandraTable table = this.table(tableName);
Iterator<BackendEntry> entries = table.query(this.session(null), query);
// Merge olap results as needed
Set<Id> olapPks = query.olapPks();
if (this.isGraphStore && !olapPks.isEmpty()) {
List<Iterator<BackendEntry>> iterators = new ArrayList<>();
for (Id pk : olapPks) {
Query q = query.copy();
table = this.table(this.olapTableName(pk));
iterators.add(table.query(this.session(null), q));
}
entries = new MergeIterator<>(entries, iterators,
BackendEntry::mergeable);
}
return entries;
}
@Override
public Number queryNumber(Query query) {
this.checkOpened();
CassandraTable table = this.table(CassandraTable.tableType(query));
return table.queryNumber(this.sessions.session(), query);
}
@Override
public BackendFeatures features() {
return FEATURES;
}
@Override
public void init() {
this.checkClusterConnected();
// Create keyspace if needed
if (!this.existsKeyspace()) {
this.initKeyspace();
}
if (this.sessions.session().opened()) {
// Session has ever been opened.
LOG.warn("Session has ever been opened(exist keyspace '{}' before)",
this.keyspace);
} else {
// Open session explicitly to get the exception when it fails
this.sessions.session().open();
}
// Create tables
this.checkOpened();
this.initTables();
LOG.debug("Store initialized: {}", this.store);
}
@Override
public void clear(boolean clearSpace) {
this.checkClusterConnected();
if (this.existsKeyspace()) {
if (!clearSpace) {
this.checkOpened();
this.clearTables();
} else {
this.clearKeyspace();
}
}
LOG.debug("Store cleared: {}", this.store);
}
@Override
public boolean initialized() {
this.checkClusterConnected();
if (!this.existsKeyspace()) {
return false;
}
for (CassandraTable table : this.tables()) {
if (!this.existsTable(table.table())) {
return false;
}
}
return true;
}
@Override
public void truncate() {
this.checkOpened();
this.truncateTables();
LOG.debug("Store truncated: {}", this.store);
}
@Override
public void beginTx() {
this.checkOpened();
CassandraSessionPool.Session session = this.sessions.session();
if (session.txState() != TxState.CLEAN) {
LOG.warn("Store {} expect state CLEAN than {} when begin()",
this.store, session.txState());
}
session.txState(TxState.BEGIN);
}
@Override
public void commitTx() {
this.checkOpened();
CassandraSessionPool.Session session = this.sessions.session();
if (session.txState() != TxState.BEGIN) {
LOG.warn("Store {} expect state BEGIN than {} when commit()",
this.store, session.txState());
}
if (!session.hasChanges()) {
session.txState(TxState.CLEAN);
LOG.debug("Store {} has nothing to commit", this.store);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Store {} commit {} statements: {}", this.store,
session.statements().size(), session.statements());
}
// TODO how to implement tx perfectly?
// Do update
session.txState(TxState.COMMITTING);
try {
session.commit();
session.txState(TxState.CLEAN);
} catch (DriverException e) {
session.txState(TxState.COMMITT_FAIL);
LOG.error("Failed to commit statements due to:", e);
assert session.statements().size() > 0;
throw new BackendException(
"Failed to commit %s statements: '%s'...", e,
session.statements().size(),
session.statements().iterator().next());
}
}
@Override
public void rollbackTx() {
this.checkOpened();
CassandraSessionPool.Session session = this.sessions.session();
// TODO how to implement perfectly?
if (session.txState() != TxState.COMMITT_FAIL &&
session.txState() != TxState.CLEAN) {
LOG.warn("Store {} expect state COMMIT_FAIL/COMMITTING/CLEAN " +
"than {} when rollback()", this.store, session.txState());
}
session.txState(TxState.ROLLBACKING);
try {
session.rollback();
} finally {
// Assume batch commit would auto rollback
session.txState(TxState.CLEAN);
}
}
protected Cluster cluster() {
return this.sessions.cluster();
}
protected boolean existsKeyspace() {
return this.cluster().getMetadata().getKeyspace(this.keyspace) != null;
}
protected boolean existsTable(String table) {
KeyspaceMetadata keyspace = this.cluster().getMetadata().getKeyspace(this.keyspace);
return keyspace != null && keyspace.getTable(table) != null;
}
protected void initKeyspace() {
Statement stmt = SchemaBuilder.createKeyspace(this.keyspace)
.ifNotExists().with()
.replication(parseReplica(this.conf));
// Create keyspace with non-keyspace-session
LOG.debug("Create keyspace: {}", stmt);
Session session = this.cluster().connect();
try {
session.execute(stmt);
} finally {
if (!session.isClosed()) {
session.close();
}
}
}
private static Map<String, Object> parseReplica(HugeConfig conf) {
Map<String, Object> replication = new HashMap<>();
// Replication strategy: SimpleStrategy or NetworkTopologyStrategy
String strategy = conf.get(CassandraOptions.CASSANDRA_STRATEGY);
replication.put("class", strategy);
switch (strategy) {
case "SimpleStrategy":
List<String> replicas =
conf.get(CassandraOptions.CASSANDRA_REPLICATION);
E.checkArgument(replicas.size() == 1,
"Individual factor value should be provided " +
"with SimpleStrategy for Cassandra");
int factor = convertFactor(replicas.get(0));
replication.put("replication_factor", factor);
break;
case "NetworkTopologyStrategy":
// The replicas format is like 'dc1:2,dc2:1'
Map<String, String> replicaMap =
conf.getMap(CassandraOptions.CASSANDRA_REPLICATION);
for (Map.Entry<String, String> e : replicaMap.entrySet()) {
E.checkArgument(!e.getKey().isEmpty(),
"The datacenter can't be empty");
replication.put(e.getKey(), convertFactor(e.getValue()));
}
break;
default:
throw new AssertionError(String.format(
"Illegal replication strategy '%s', valid strategy " +
"is 'SimpleStrategy' or 'NetworkTopologyStrategy'",
strategy));
}
return replication;
}
private static int convertFactor(String factor) {
try {
return Integer.valueOf(factor);
} catch (NumberFormatException e) {
throw new BackendException(
"Expect int factor value for SimpleStrategy, " +
"but got '%s'", factor);
}
}
protected void clearKeyspace() {
// Drop keyspace with non-keyspace-session
Statement stmt = SchemaBuilder.dropKeyspace(this.keyspace).ifExists();
LOG.debug("Drop keyspace: {}", stmt);
Session session = this.cluster().connect();
try {
session.execute(stmt);
} finally {
if (!session.isClosed()) {
session.close();
}
}
}
protected void initTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables()) {
table.init(session);
}
}
protected void clearTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables()) {
table.clear(session);
}
}
protected void truncateTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables()) {
if (table.isOlap()) {
table.dropTable(session);
} else {
table.truncate(session);
}
}
}
protected Collection<CassandraTable> tables() {
return this.tables.values();
}
@Override
protected final CassandraTable table(HugeType type) {
return this.table(type.string());
}
protected final CassandraTable table(String name) {
assert name != null;
CassandraTable table = this.tables.get(name);
if (table == null) {
throw new BackendException("Unsupported table: %s", name);
}
return table;
}
@Override
protected CassandraSessionPool.Session session(HugeType type) {
this.checkOpened();
return this.sessions.session();
}
protected final void checkClusterConnected() {
E.checkState(this.sessions != null && this.sessions.clusterConnected(),
"Cassandra cluster has not been connected");
}
protected static final CassandraBackendEntry castBackendEntry(BackendEntry entry) {
assert entry instanceof CassandraBackendEntry : entry.getClass();
if (!(entry instanceof CassandraBackendEntry)) {
throw new BackendException("Cassandra store only supports CassandraBackendEntry");
}
return (CassandraBackendEntry) entry;
}
/***************************** Store defines *****************************/
public static class CassandraSchemaStore extends CassandraStore {
private final CassandraTables.Counters counters;
public CassandraSchemaStore(BackendStoreProvider provider,
String keyspace, String store) {
super(provider, keyspace, store);
this.counters = new CassandraTables.Counters();
registerTableManager(HugeType.VERTEX_LABEL,
new CassandraTables.VertexLabel());
registerTableManager(HugeType.EDGE_LABEL,
new CassandraTables.EdgeLabel());
registerTableManager(HugeType.PROPERTY_KEY,
new CassandraTables.PropertyKey());
registerTableManager(HugeType.INDEX_LABEL,
new CassandraTables.IndexLabel());
}
@Override
protected Collection<CassandraTable> tables() {
List<CassandraTable> tables = new ArrayList<>(super.tables());
tables.add(this.counters);
return tables;
}
@Override
public void increaseCounter(HugeType type, long increment) {
this.checkOpened();
CassandraSessionPool.Session session = super.sessions.session();
this.counters.increaseCounter(session, type, increment);
}
@Override
public long getCounter(HugeType type) {
this.checkOpened();
CassandraSessionPool.Session session = super.sessions.session();
return this.counters.getCounter(session, type);
}
@Override
public boolean isSchemaStore() {
return true;
}
}
public static class CassandraGraphStore extends CassandraStore {
public CassandraGraphStore(BackendStoreProvider provider,
String keyspace, String store) {
super(provider, keyspace, store);
registerTableManager(HugeType.VERTEX,
new CassandraTables.Vertex(store));
registerTableManager(HugeType.EDGE_OUT,
CassandraTables.Edge.out(store));
registerTableManager(HugeType.EDGE_IN,
CassandraTables.Edge.in(store));
registerTableManager(HugeType.SECONDARY_INDEX,
new CassandraTables.SecondaryIndex(store));
registerTableManager(HugeType.RANGE_INT_INDEX,
new CassandraTables.RangeIntIndex(store));
registerTableManager(HugeType.RANGE_FLOAT_INDEX,
new CassandraTables.RangeFloatIndex(store));
registerTableManager(HugeType.RANGE_LONG_INDEX,
new CassandraTables.RangeLongIndex(store));
registerTableManager(HugeType.RANGE_DOUBLE_INDEX,
new CassandraTables.RangeDoubleIndex(store));
registerTableManager(HugeType.SEARCH_INDEX,
new CassandraTables.SearchIndex(store));
registerTableManager(HugeType.SHARD_INDEX,
new CassandraTables.ShardIndex(store));
registerTableManager(HugeType.UNIQUE_INDEX,
new CassandraTables.UniqueIndex(store));
registerTableManager(this.olapTableName(HugeType.SECONDARY_INDEX),
new CassandraTables.OlapSecondaryIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_INT_INDEX),
new CassandraTables.OlapRangeIntIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_LONG_INDEX),
new CassandraTables.OlapRangeLongIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_FLOAT_INDEX),
new CassandraTables.OlapRangeFloatIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_DOUBLE_INDEX),
new CassandraTables.OlapRangeDoubleIndex(store));
}
@Override
public Id nextId(HugeType type) {
throw new UnsupportedOperationException(
"CassandraGraphStore.nextId()");
}
@Override
public void increaseCounter(HugeType type, long num) {
throw new UnsupportedOperationException(
"CassandraGraphStore.increaseCounter()");
}
@Override
public long getCounter(HugeType type) {
throw new UnsupportedOperationException(
"CassandraGraphStore.getCounter()");
}
@Override
public boolean isSchemaStore() {
return false;
}
/**
* TODO: can we remove this method since createOlapTable would register?
*/
@Override
public void checkAndRegisterOlapTable(Id id) {
CassandraTable table = new CassandraTables.Olap(this.store(), id);
if (!this.existsTable(table.table())) {
throw new HugeException("Not exist table '%s'", table.table());
}
registerTableManager(this.olapTableName(id), table);
}
@Override
public void createOlapTable(Id id) {
CassandraTable table = new CassandraTables.Olap(this.store(), id);
table.init(this.session(null));
registerTableManager(this.olapTableName(id), table);
}
@Override
public void clearOlapTable(Id id) {
String name = this.olapTableName(id);
CassandraTable table = this.table(name);
if (!this.existsTable(table.table())) {
throw new HugeException("Not exist table '%s'", name);
}
table.truncate(this.session(null));
}
@Override
public void removeOlapTable(Id id) {
String name = this.olapTableName(id);
CassandraTable table = this.table(name);
if (!this.existsTable(table.table())) {
throw new HugeException("Not exist table '%s'", name);
}
table.dropTable(this.session(null));
this.unregisterTableManager(name);
}
}
public static class CassandraSystemStore extends CassandraGraphStore {
private final CassandraTables.Meta meta;
public CassandraSystemStore(BackendStoreProvider provider, String keyspace, String store) {
super(provider, keyspace, store);
this.meta = new CassandraTables.Meta();
}
@Override
public void init() {
super.init();
this.checkOpened();
String driverVersion = this.provider().driverVersion();
this.meta.writeVersion(this.session(null), driverVersion);
LOG.info("Write down the backend version: {}", driverVersion);
}
@Override
public String storedVersion() {
CassandraSessionPool.Session session = this.session(null);
return this.meta.readVersion(session);
}
@Override
protected Collection<CassandraTable> tables() {
List<CassandraTable> tables = new ArrayList<>(super.tables());
tables.add(this.meta);
return tables;
}
}
}