blob: c5cfa9396942e3e9881ffe1da754c2cb884755fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.cassandra;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.id.IdUtil;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendEntryIterator;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.E;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Clause;
import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Update;
import com.datastax.driver.core.querybuilder.Using;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
public class CassandraTables {
public static final String LABEL_INDEX = "label_index";
public static final String NAME_INDEX = "name_index";
private static final DataType TYPE_PK = DataType.cint();
private static final DataType TYPE_SL = DataType.cint(); // VL/EL
private static final DataType TYPE_IL = DataType.cint();
private static final DataType TYPE_UD = DataType.map(DataType.text(),
DataType.text());
private static final DataType TYPE_ID = DataType.blob();
private static final DataType TYPE_PROP = DataType.blob();
private static final DataType TYPE_TTL = DataType.bigint();
private static final DataType TYPE_EXPIRED_TIME = DataType.bigint();
private static final long COMMIT_DELETE_BATCH = Query.COMMIT_BATCH;
public static class Meta extends CassandraTable {
public static final String TABLE = HugeType.META.string();
public Meta() {
super(TABLE);
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.NAME, DataType.text()
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.VALUE, DataType.text()
);
this.createTable(session, pkeys, ckeys, columns);
}
public void writeVersion(CassandraSessionPool.Session session,
String version) {
Insert insert = QueryBuilder.insertInto(TABLE);
insert.value(formatKey(HugeKeys.NAME), formatKey(HugeKeys.VERSION));
insert.value(formatKey(HugeKeys.VALUE), version);
session.execute(insert);
}
public String readVersion(CassandraSessionPool.Session session) {
Clause where = formatEQ(HugeKeys.NAME, formatKey(HugeKeys.VERSION));
Select select = QueryBuilder.select(formatKey(HugeKeys.VALUE))
.from(TABLE);
select.where(where);
Row row = session.execute(select).one();
if (row == null) {
return null;
}
return row.getString(formatKey(HugeKeys.VALUE));
}
}
public static class Counters extends CassandraTable {
public static final String TABLE = HugeType.COUNTER.string();
public Counters() {
super(TABLE);
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.SCHEMA_TYPE, DataType.text()
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.ID, DataType.counter()
);
this.createTable(session, pkeys, ckeys, columns);
}
public long getCounter(CassandraSessionPool.Session session,
HugeType type) {
Clause where = formatEQ(HugeKeys.SCHEMA_TYPE, type.name());
Select select = QueryBuilder.select(formatKey(HugeKeys.ID))
.from(TABLE);
select.where(where);
Row row = session.execute(select).one();
if (row == null) {
return 0L;
} else {
return row.getLong(formatKey(HugeKeys.ID));
}
}
public void increaseCounter(CassandraSessionPool.Session session,
HugeType type, long increment) {
Update update = QueryBuilder.update(TABLE);
update.with(QueryBuilder.incr(formatKey(HugeKeys.ID), increment));
update.where(formatEQ(HugeKeys.SCHEMA_TYPE, type.name()));
session.execute(update);
}
}
public static class VertexLabel extends CassandraTable {
public static final String TABLE = HugeType.VERTEX_LABEL.string();
public VertexLabel() {
super(TABLE);
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.ID, TYPE_SL
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap
.<HugeKeys, DataType>builder()
.put(HugeKeys.NAME, DataType.text())
.put(HugeKeys.ID_STRATEGY, DataType.tinyint())
.put(HugeKeys.PRIMARY_KEYS, DataType.list(TYPE_PK))
.put(HugeKeys.NULLABLE_KEYS, DataType.set(TYPE_PK))
.put(HugeKeys.INDEX_LABELS, DataType.set(TYPE_IL))
.put(HugeKeys.PROPERTIES, DataType.set(TYPE_PK))
.put(HugeKeys.ENABLE_LABEL_INDEX, DataType.cboolean())
.put(HugeKeys.USER_DATA, TYPE_UD)
.put(HugeKeys.STATUS, DataType.tinyint())
.put(HugeKeys.TTL, TYPE_TTL)
.put(HugeKeys.TTL_START_TIME, TYPE_PK)
.build();
this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}
public static class EdgeLabel extends CassandraTable {
public static final String TABLE = HugeType.EDGE_LABEL.string();
public EdgeLabel() {
super(TABLE);
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.ID, TYPE_SL
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap
.<HugeKeys, DataType>builder()
.put(HugeKeys.NAME, DataType.text())
.put(HugeKeys.FREQUENCY, DataType.tinyint())
.put(HugeKeys.SOURCE_LABEL, TYPE_SL)
.put(HugeKeys.TARGET_LABEL, TYPE_SL)
.put(HugeKeys.SORT_KEYS, DataType.list(TYPE_PK))
.put(HugeKeys.NULLABLE_KEYS, DataType.set(TYPE_PK))
.put(HugeKeys.INDEX_LABELS, DataType.set(TYPE_IL))
.put(HugeKeys.PROPERTIES, DataType.set(TYPE_PK))
.put(HugeKeys.ENABLE_LABEL_INDEX, DataType.cboolean())
.put(HugeKeys.USER_DATA, TYPE_UD)
.put(HugeKeys.STATUS, DataType.tinyint())
.put(HugeKeys.TTL, TYPE_TTL)
.put(HugeKeys.TTL_START_TIME, TYPE_PK)
.build();
this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}
public static class PropertyKey extends CassandraTable {
public static final String TABLE = HugeType.PROPERTY_KEY.string();
public PropertyKey() {
super(TABLE);
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.ID, DataType.cint()
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap
.<HugeKeys, DataType>builder()
.put(HugeKeys.NAME, DataType.text())
.put(HugeKeys.DATA_TYPE, DataType.tinyint())
.put(HugeKeys.CARDINALITY, DataType.tinyint())
.put(HugeKeys.AGGREGATE_TYPE, DataType.tinyint())
.put(HugeKeys.WRITE_TYPE, DataType.tinyint())
.put(HugeKeys.PROPERTIES, DataType.set(TYPE_PK))
.put(HugeKeys.USER_DATA, TYPE_UD)
.put(HugeKeys.STATUS, DataType.tinyint())
.build();
this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}
public static class IndexLabel extends CassandraTable {
public static final String TABLE = HugeType.INDEX_LABEL.string();
public IndexLabel() {
super(TABLE);
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.ID, TYPE_IL
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap
.<HugeKeys, DataType>builder()
.put(HugeKeys.NAME, DataType.text())
.put(HugeKeys.BASE_TYPE, DataType.tinyint())
.put(HugeKeys.BASE_VALUE, TYPE_SL)
.put(HugeKeys.INDEX_TYPE, DataType.tinyint())
.put(HugeKeys.FIELDS, DataType.list(TYPE_PK))
.put(HugeKeys.USER_DATA, TYPE_UD)
.put(HugeKeys.STATUS, DataType.tinyint())
.build();
this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, NAME_INDEX, HugeKeys.NAME);
}
}
public static class Vertex extends CassandraTable {
public static final String TABLE = HugeType.VERTEX.string();
public Vertex(String store) {
super(joinTableName(store, TABLE));
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.ID, TYPE_ID
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.LABEL, TYPE_SL,
HugeKeys.PROPERTIES, DataType.map(TYPE_PK, TYPE_PROP),
HugeKeys.EXPIRED_TIME, TYPE_EXPIRED_TIME
);
this.createTable(session, pkeys, ckeys, columns);
this.createIndex(session, LABEL_INDEX, HugeKeys.LABEL);
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
Insert insert = this.buildInsert(entry);
session.add(setTtl(insert, entry));
}
@Override
public void append(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
Update append = this.buildAppend(entry);
session.add(setTtl(append, entry));
}
}
public static class Edge extends CassandraTable {
public static final String TABLE_SUFFIX = HugeType.EDGE.string();
private final String store;
private final Directions direction;
protected Edge(String store, Directions direction) {
super(joinTableName(store, table(direction)));
this.store = store;
this.direction = direction;
}
protected String edgesTable(Directions direction) {
return joinTableName(this.store, table(direction));
}
protected Directions direction() {
return this.direction;
}
protected String labelIndexTable() {
return this.table();
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.OWNER_VERTEX, TYPE_ID
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of(
HugeKeys.DIRECTION, DataType.tinyint(),
HugeKeys.LABEL, TYPE_SL,
HugeKeys.SORT_VALUES, DataType.text(),
HugeKeys.OTHER_VERTEX, TYPE_ID
);
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.PROPERTIES, DataType.map(TYPE_PK, TYPE_PROP),
HugeKeys.EXPIRED_TIME, TYPE_EXPIRED_TIME
);
this.createTable(session, pkeys, ckeys, columns);
/*
* Only out-edges table needs label index because we query edges
* by label from out-edges table
*/
if (this.direction == Directions.OUT) {
this.createIndex(session, LABEL_INDEX, HugeKeys.LABEL);
}
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
Insert insert = this.buildInsert(entry);
session.add(setTtl(insert, entry));
}
@Override
public void append(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
Update update = this.buildAppend(entry);
session.add(setTtl(update, entry));
}
@Override
protected List<HugeKeys> pkColumnName() {
return ImmutableList.of(HugeKeys.OWNER_VERTEX);
}
@Override
protected List<HugeKeys> idColumnName() {
return Arrays.asList(EdgeId.KEYS);
}
@Override
protected List<Object> idColumnValue(Id id) {
EdgeId edgeId;
if (id instanceof EdgeId) {
edgeId = (EdgeId) id;
} else {
String[] idParts = EdgeId.split(id);
if (idParts.length == 1) {
// Delete edge by label
return Arrays.asList(idParts);
}
id = IdUtil.readString(id.asString());
edgeId = EdgeId.parse(id.asString());
}
E.checkState(edgeId.direction() == this.direction,
"Can't query %s edges from %s edges table",
edgeId.direction(), this.direction);
return idColumnValue(edgeId);
}
protected final List<Object> idColumnValue(EdgeId edgeId) {
// TODO: move to Serializer
List<Object> list = new ArrayList<>(5);
list.add(IdUtil.writeBinString(edgeId.ownerVertexId()));
list.add(edgeId.directionCode());
list.add(edgeId.edgeLabelId().asLong());
list.add(edgeId.sortValues());
list.add(IdUtil.writeBinString(edgeId.otherVertexId()));
return list;
}
@Override
public void delete(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
/*
* TODO: Delete edge by label
* Need to implement the framework that can delete with query
* which contains id or condition.
*/
// Let super class do delete if not deleting edge by label
List<Object> idParts = this.idColumnValue(entry.id());
if (idParts.size() > 1 || entry.columns().size() > 0) {
super.delete(session, entry);
return;
}
// The only element is labeled
this.deleteEdgesByLabel(session, entry.id());
}
protected void deleteEdgesByLabel(CassandraSessionPool.Session session,
Id label) {
// Edges in edges_in table will be deleted when direction is OUT
if (this.direction == Directions.IN) {
return;
}
final String OWNER_VERTEX = formatKey(HugeKeys.OWNER_VERTEX);
final String SORT_VALUES = formatKey(HugeKeys.SORT_VALUES);
final String OTHER_VERTEX = formatKey(HugeKeys.OTHER_VERTEX);
// Query edges by label index
Select select = QueryBuilder.select().from(this.labelIndexTable());
select.where(formatEQ(HugeKeys.LABEL, label.asLong()));
ResultSet rs;
try {
rs = session.execute(select);
} catch (DriverException e) {
throw new BackendException("Failed to query edges " +
"with label '%s' for deleting", e, label);
}
// Delete edges
long count = 0L;
for (Row row : rs) {
Object ownerVertex = row.getObject(OWNER_VERTEX);
Object sortValues = row.getObject(SORT_VALUES);
Object otherVertex = row.getObject(OTHER_VERTEX);
// Delete OUT edges from edges_out table
session.add(buildDelete(label, ownerVertex, Directions.OUT,
sortValues, otherVertex));
// Delete IN edges from edges_in table
session.add(buildDelete(label, otherVertex, Directions.IN,
sortValues, ownerVertex));
count += 2L;
if (count >= COMMIT_DELETE_BATCH) {
session.commit();
count = 0;
}
}
if (count > 0L) {
session.commit();
}
}
private Delete buildDelete(Id label, Object ownerVertex,
Directions direction, Object sortValues,
Object otherVertex) {
Delete delete = QueryBuilder.delete().from(edgesTable(direction));
delete.where(formatEQ(HugeKeys.OWNER_VERTEX, ownerVertex));
delete.where(formatEQ(HugeKeys.DIRECTION,
EdgeId.directionToCode(direction)));
delete.where(formatEQ(HugeKeys.LABEL, label.asLong()));
delete.where(formatEQ(HugeKeys.SORT_VALUES, sortValues));
delete.where(formatEQ(HugeKeys.OTHER_VERTEX, otherVertex));
return delete;
}
@Override
protected BackendEntry mergeEntries(BackendEntry e1, BackendEntry e2) {
// Merge edges into vertex
// TODO: merge rows before calling row2Entry()
CassandraBackendEntry current = (CassandraBackendEntry) e1;
CassandraBackendEntry next = (CassandraBackendEntry) e2;
E.checkState(current == null || current.type().isVertex(),
"The current entry must be null or VERTEX");
E.checkState(next != null && next.type().isEdge(),
"The next entry must be EDGE");
long maxSize = BackendEntryIterator.INLINE_BATCH_SIZE;
if (current != null && current.subRows().size() < maxSize) {
Object nextVertexId = next.column(HugeKeys.OWNER_VERTEX);
if (current.id().equals(IdGenerator.of(nextVertexId))) {
current.subRow(next.row());
return current;
}
}
return this.wrapByVertex(next);
}
private CassandraBackendEntry wrapByVertex(CassandraBackendEntry edge) {
assert edge.type().isEdge();
Object ownerVertex = edge.column(HugeKeys.OWNER_VERTEX);
E.checkState(ownerVertex != null, "Invalid backend entry");
Id vertexId = IdGenerator.of(ownerVertex);
CassandraBackendEntry vertex = new CassandraBackendEntry(
HugeType.VERTEX, vertexId);
vertex.column(HugeKeys.ID, ownerVertex);
vertex.column(HugeKeys.PROPERTIES, ImmutableMap.of());
vertex.subRow(edge.row());
return vertex;
}
private static String table(Directions direction) {
assert direction == Directions.OUT || direction == Directions.IN;
return direction.type().string() + TABLE_SUFFIX;
}
public static CassandraTable out(String store) {
return new Edge(store, Directions.OUT);
}
public static CassandraTable in(String store) {
return new Edge(store, Directions.IN);
}
}
public static class SecondaryIndex extends CassandraTable {
public static final String TABLE = HugeType.SECONDARY_INDEX.string();
public SecondaryIndex(String store) {
this(store, TABLE);
}
protected SecondaryIndex(String store, String table) {
super(joinTableName(store, table));
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.FIELD_VALUES, DataType.text()
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of(
HugeKeys.INDEX_LABEL_ID, TYPE_IL,
HugeKeys.ELEMENT_IDS, TYPE_ID
);
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.EXPIRED_TIME, TYPE_EXPIRED_TIME
);
this.createTable(session, pkeys, ckeys, columns);
}
@Override
protected List<HugeKeys> idColumnName() {
return ImmutableList.of(HugeKeys.FIELD_VALUES,
HugeKeys.INDEX_LABEL_ID,
HugeKeys.ELEMENT_IDS);
}
@Override
protected List<HugeKeys> modifiableColumnName() {
return ImmutableList.of();
}
@Override
public void delete(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
String fieldValues = entry.column(HugeKeys.FIELD_VALUES);
if (fieldValues != null) {
super.delete(session, entry);
return;
}
Long indexLabel = entry.column(HugeKeys.INDEX_LABEL_ID);
if (indexLabel == null) {
throw new BackendException("SecondaryIndex deletion needs " +
"INDEX_LABEL_ID, but not provided.");
}
Select select = QueryBuilder.select().from(this.table());
select.where(formatEQ(HugeKeys.INDEX_LABEL_ID, indexLabel));
select.allowFiltering();
ResultSet rs;
try {
rs = session.execute(select);
} catch (DriverException e) {
throw new BackendException("Failed to query secondary " +
"indexes with index label id '%s' for deleting",
indexLabel, e);
}
final String FIELD_VALUES = formatKey(HugeKeys.FIELD_VALUES);
long count = 0L;
for (Row r : rs) {
fieldValues = r.get(FIELD_VALUES, String.class);
Delete delete = QueryBuilder.delete().from(this.table());
delete.where(formatEQ(HugeKeys.INDEX_LABEL_ID, indexLabel));
delete.where(formatEQ(HugeKeys.FIELD_VALUES, fieldValues));
session.add(delete);
if (++count >= COMMIT_DELETE_BATCH) {
session.commit();
count = 0L;
}
}
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
throw new BackendException("SecondaryIndex insertion is not supported.");
}
@Override
public void append(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
assert entry.columns().size() == 3 || entry.columns().size() == 4;
Insert insert = this.buildInsert(entry);
session.add(setTtl(insert, entry));
}
@Override
public void eliminate(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
assert entry.columns().size() == 3 || entry.columns().size() == 4;
this.delete(session, entry);
}
}
public static class SearchIndex extends SecondaryIndex {
public static final String TABLE = HugeType.SEARCH_INDEX.string();
public SearchIndex(String store) {
super(store, TABLE);
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
throw new BackendException("SearchIndex insertion is not supported.");
}
}
/**
* TODO: set field value as key and set element id as value
*/
public static class UniqueIndex extends SecondaryIndex {
public static final String TABLE = HugeType.UNIQUE_INDEX.string();
public UniqueIndex(String store) {
super(store, TABLE);
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
throw new BackendException("UniqueIndex insertion is not supported.");
}
}
public abstract static class RangeIndex extends CassandraTable {
protected RangeIndex(String store, String table) {
super(joinTableName(store, table));
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.INDEX_LABEL_ID, TYPE_IL
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of(
HugeKeys.FIELD_VALUES, this.fieldValuesType(),
HugeKeys.ELEMENT_IDS, TYPE_ID
);
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.EXPIRED_TIME, TYPE_EXPIRED_TIME
);
this.createTable(session, pkeys, ckeys, columns);
}
protected DataType fieldValuesType() {
return DataType.decimal();
}
@Override
protected List<HugeKeys> idColumnName() {
return ImmutableList.of(HugeKeys.INDEX_LABEL_ID,
HugeKeys.FIELD_VALUES,
HugeKeys.ELEMENT_IDS);
}
@Override
protected List<HugeKeys> modifiableColumnName() {
return ImmutableList.of();
}
@Override
public void delete(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
Object fieldValues = entry.column(HugeKeys.FIELD_VALUES);
if (fieldValues != null) {
super.delete(session, entry);
return;
}
Long indexLabel = entry.column(HugeKeys.INDEX_LABEL_ID);
if (indexLabel == null) {
throw new BackendException("Range index deletion needs INDEX_LABEL_ID, " +
"but not provided.");
}
Delete delete = QueryBuilder.delete().from(this.table());
delete.where(formatEQ(HugeKeys.INDEX_LABEL_ID, indexLabel));
session.add(delete);
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
throw new BackendException("RangeIndex insertion is not supported.");
}
@Override
public void append(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
assert entry.columns().size() == 3 || entry.columns().size() == 4;
Insert insert = this.buildInsert(entry);
session.add(setTtl(insert, entry));
}
@Override
public void eliminate(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
assert entry.columns().size() == 3 || entry.columns().size() == 4;
this.delete(session, entry);
}
}
public static class RangeIntIndex extends RangeIndex {
public static final String TABLE = HugeType.RANGE_INT_INDEX.string();
public RangeIntIndex(String store) {
super(store, TABLE);
}
@Override
protected DataType fieldValuesType() {
return DataType.cint();
}
}
public static class RangeFloatIndex extends RangeIndex {
public static final String TABLE = HugeType.RANGE_FLOAT_INDEX.string();
public RangeFloatIndex(String store) {
super(store, TABLE);
}
@Override
protected DataType fieldValuesType() {
return DataType.cfloat();
}
}
public static class RangeLongIndex extends RangeIndex {
public static final String TABLE = HugeType.RANGE_LONG_INDEX.string();
public RangeLongIndex(String store) {
super(store, TABLE);
}
@Override
protected DataType fieldValuesType() {
// TODO: DataType.varint()
return DataType.bigint();
}
}
public static class RangeDoubleIndex extends RangeIndex {
public static final String TABLE = HugeType.RANGE_DOUBLE_INDEX.string();
public RangeDoubleIndex(String store) {
super(store, TABLE);
}
@Override
protected DataType fieldValuesType() {
return DataType.cdouble();
}
}
public static class ShardIndex extends RangeIndex {
public static final String TABLE = HugeType.SHARD_INDEX.string();
public ShardIndex(String store) {
super(store, TABLE);
}
@Override
protected DataType fieldValuesType() {
return DataType.text();
}
@Override
public void insert(CassandraSessionPool.Session session,
CassandraBackendEntry.Row entry) {
throw new BackendException("ShardIndex insertion is not supported.");
}
}
public static class Olap extends CassandraTable {
public static final String TABLE = HugeType.OLAP.string();
private Id pkId;
public Olap(String store, Id id) {
super(joinTableName(store, joinTableName(TABLE, id.asString())));
this.pkId = id;
}
@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.ID, TYPE_ID
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.PROPERTY_VALUE, TYPE_PROP
);
this.createTable(session, pkeys, ckeys, columns);
}
@Override
protected Iterator<BackendEntry> results2Entries(Query q, ResultSet r) {
return new CassandraEntryIterator(r, q, (e1, row) -> {
CassandraBackendEntry e2 = row2Entry(q.resultType(), row);
e2.subId(this.pkId);
return this.mergeEntries(e1, e2);
});
}
@Override
public boolean isOlap() {
return true;
}
}
public static class OlapSecondaryIndex extends SecondaryIndex {
public static final String TABLE = HugeType.OLAP.string();
public OlapSecondaryIndex(String store) {
this(store, TABLE);
}
protected OlapSecondaryIndex(String store, String table) {
super(joinTableName(store, table));
}
}
public static class OlapRangeIntIndex extends RangeIntIndex {
public static final String TABLE = HugeType.OLAP.string();
public OlapRangeIntIndex(String store) {
this(store, TABLE);
}
protected OlapRangeIntIndex(String store, String table) {
super(joinTableName(store, table));
}
}
public static class OlapRangeLongIndex extends RangeLongIndex {
public static final String TABLE = HugeType.OLAP.string();
public OlapRangeLongIndex(String store) {
this(store, TABLE);
}
protected OlapRangeLongIndex(String store, String table) {
super(joinTableName(store, table));
}
}
public static class OlapRangeFloatIndex extends RangeFloatIndex {
public static final String TABLE = HugeType.OLAP.string();
public OlapRangeFloatIndex(String store) {
this(store, TABLE);
}
protected OlapRangeFloatIndex(String store, String table) {
super(joinTableName(store, table));
}
}
public static class OlapRangeDoubleIndex extends RangeDoubleIndex {
public static final String TABLE = HugeType.OLAP.string();
public OlapRangeDoubleIndex(String store) {
this(store, TABLE);
}
protected OlapRangeDoubleIndex(String store, String table) {
super(joinTableName(store, table));
}
}
private static Statement setTtl(BuiltStatement statement,
CassandraBackendEntry.Row entry) {
long ttl = entry.ttl();
if (ttl != 0L) {
int calcTtl = (int) Math.ceil(ttl / 1000D);
Using usingTtl = QueryBuilder.ttl(calcTtl);
if (statement instanceof Insert) {
((Insert) statement).using(usingTtl);
} else {
assert statement instanceof Update;
((Update) statement).using(usingTtl);
}
}
return statement;
}
}