blob: e51a8ea4c0bf4cd5f7b96f24bd4b85931e0d625f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.mysql;
import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.logging.log4j.util.Strings;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.IdQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendTable;
import org.apache.hugegraph.backend.store.Shard;
import org.apache.hugegraph.backend.store.TableDefine;
import org.apache.hugegraph.exception.NotFoundException;
import org.apache.hugegraph.iterator.ExtendableIterator;
import org.apache.hugegraph.iterator.WrappedIterator;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;
public abstract class MysqlTable
extends BackendTable<MysqlSessions.Session, MysqlBackendEntry.Row> {
private static final Logger LOG = Log.logger(MysqlTable.class);
private static final String DECIMAL = "DECIMAL";
// The template cache for insert and delete statements
private String insertTemplate;
private String insertTemplateTtl;
private String deleteTemplate;
private String updateIfPresentTemplate;
private String updateIfAbsentTemplate;
private final MysqlShardSplitter shardSplitter;
public MysqlTable(String table) {
super(table);
this.insertTemplate = null;
this.insertTemplateTtl = null;
this.deleteTemplate = null;
this.updateIfPresentTemplate = null;
this.updateIfAbsentTemplate = null;
this.shardSplitter = new MysqlShardSplitter(this.table());
}
@Override
protected void registerMetaHandlers() {
this.registerMetaHandler("splits", (session, meta, args) -> {
E.checkArgument(args.length == 1,
"The args count of %s must be 1", meta);
long splitSize = (long) args[0];
return this.shardSplitter.getSplits(session, splitSize);
});
}
public abstract TableDefine tableDefine();
@Override
public void init(MysqlSessions.Session session) {
this.createTable(session, this.tableDefine());
}
@Override
public void clear(MysqlSessions.Session session) {
this.dropTable(session);
}
public void truncate(MysqlSessions.Session session) {
this.truncateTable(session);
}
protected void createTable(MysqlSessions.Session session, TableDefine tableDefine) {
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE IF NOT EXISTS ");
sql.append(this.table()).append(" (");
// Add columns
for (Map.Entry<HugeKeys, String> entry :
tableDefine.columns().entrySet()) {
sql.append(formatKey(entry.getKey()));
sql.append(" ");
sql.append(entry.getValue());
sql.append(", ");
}
// Specified primary keys
sql.append(" PRIMARY KEY (");
int i = 0;
int size = tableDefine.keys().size();
for (HugeKeys key : tableDefine.keys()) {
sql.append(formatKey(key));
if (++i != size) {
sql.append(", ");
}
}
sql.append("))");
sql.append(this.engine(session));
sql.append(";");
LOG.debug("Create table: {}", sql);
try {
session.execute(sql.toString());
} catch (SQLException e) {
throw new BackendException("Failed to create table with '%s'",
e, sql);
}
}
protected String engine(MysqlSessions.Session session) {
String engine = session.config().get(MysqlOptions.JDBC_STORAGE_ENGINE);
return " ENGINE=" + engine;
}
protected void dropTable(MysqlSessions.Session session) {
LOG.debug("Drop table: {}", this.table());
String sql = this.buildDropTemplate();
try {
session.execute(sql);
} catch (SQLException e) {
throw new BackendException("Failed to drop table with '%s'",
e, sql);
}
}
protected void truncateTable(MysqlSessions.Session session) {
LOG.debug("Truncate table: {}", this.table());
String sql = this.buildTruncateTemplate();
try {
session.execute(sql);
} catch (SQLException e) {
throw new BackendException("Failed to truncate table with '%s'",
e, sql);
}
}
protected List<HugeKeys> idColumnName() {
return this.tableDefine().keys();
}
protected List<Long> idColumnValue(MysqlBackendEntry.Row entry) {
return ImmutableList.of(entry.id().asLong());
}
protected List<Object> idColumnValue(Id id) {
return ImmutableList.of(id.asObject());
}
protected void insertOrUpdate(MysqlSessions.Session session, String template,
List<?> params) {
PreparedStatement insertStmt;
try {
// Create or get insert prepare statement
insertStmt = session.prepareStatement(template);
int i = 1;
for (Object param : params) {
insertStmt.setObject(i++, param);
}
} catch (SQLException e) {
throw new BackendException("Failed to prepare statement '%s' " +
"with params: %s", template, params);
}
session.add(insertStmt);
}
protected final String buildUpdateTemplate(MysqlBackendEntry.Row entry) {
if (entry.ttl() != 0L) {
if (this.insertTemplateTtl != null) {
return this.insertTemplateTtl;
}
this.insertTemplateTtl = this.buildUpdateForcedTemplate(entry);
return this.insertTemplateTtl;
} else {
if (this.insertTemplate != null) {
return this.insertTemplate;
}
this.insertTemplate = this.buildUpdateForcedTemplate(entry);
return this.insertTemplate;
}
}
protected String buildUpdateForcedTemplate(MysqlBackendEntry.Row entry) {
StringBuilder insert = new StringBuilder();
insert.append("REPLACE INTO ").append(this.table());
return this.buildInsertKeys(insert, entry);
}
protected String buildUpdateIfAbsentTemplate(MysqlBackendEntry.Row entry) {
StringBuilder insert = new StringBuilder();
insert.append("INSERT IGNORE INTO ").append(this.table());
return this.buildInsertKeys(insert, entry);
}
protected String buildInsertKeys(StringBuilder insert,
MysqlBackendEntry.Row entry) {
insert.append(" (");
int i = 0;
int size = entry.columns().size();
for (HugeKeys key : entry.columns().keySet()) {
insert.append(formatKey(key));
if (++i != size) {
insert.append(", ");
}
}
insert.append(") VALUES (");
// Fill with '?' as a placeholder
for (i = 0; i < size; i++) {
insert.append("?");
if (i != size - 1) {
insert.append(", ");
}
}
insert.append(")");
return insert.toString();
}
protected List<?> buildUpdateForcedParams(MysqlBackendEntry.Row entry) {
return this.buildColumnsParams(entry);
}
protected List<?> buildUpdateIfAbsentParams(MysqlBackendEntry.Row entry) {
return this.buildColumnsParams(entry);
}
protected List<Object> buildColumnsParams(MysqlBackendEntry.Row entry) {
return this.buildColumnsParams(entry, null);
}
protected List<Object> buildColumnsParams(MysqlBackendEntry.Row entry,
List<HugeKeys> skipKeys) {
List<Object> objects = new ArrayList<>();
for (Map.Entry<HugeKeys, Object> e : entry.columns().entrySet()) {
HugeKeys key = e.getKey();
Object value = e.getValue();
if (skipKeys != null && skipKeys.contains(key)) {
continue;
}
String type = this.tableDefine().columns().get(key);
if (type.startsWith(DECIMAL)) {
value = new BigDecimal(value.toString());
}
objects.add(value);
}
return objects;
}
protected String buildUpdateIfPresentTemplate(MysqlBackendEntry.Row entry) {
StringBuilder update = new StringBuilder();
update.append("UPDATE ").append(this.table());
update.append(" SET ");
List<HugeKeys> idNames = this.idColumnName();
int i = 0;
for (HugeKeys key : entry.columns().keySet()) {
if (idNames.contains(key)) {
continue;
}
if (i++ > 0) {
update.append(", ");
}
update.append(formatKey(key));
update.append("=?");
}
WhereBuilder where = this.newWhereBuilder();
where.and(formatKeys(idNames), "=");
update.append(where.build());
return update.toString();
}
protected List<?> buildUpdateIfPresentParams(MysqlBackendEntry.Row entry) {
List<HugeKeys> idNames = this.idColumnName();
List<Object> params = this.buildColumnsParams(entry, idNames);
List<Long> idValues = this.idColumnValue(entry);
params.addAll(idValues);
return params;
}
protected String buildDeleteTemplate(List<HugeKeys> idNames) {
StringBuilder delete = new StringBuilder();
delete.append("DELETE FROM ").append(this.table());
this.appendPartition(delete);
WhereBuilder where = this.newWhereBuilder();
where.and(formatKeys(idNames), "=");
delete.append(where.build());
return delete.toString();
}
protected String buildDropTemplate() {
return String.format("DROP TABLE IF EXISTS %s;", this.table());
}
protected String buildTruncateTemplate() {
return String.format("TRUNCATE TABLE %s;", this.table());
}
protected void appendPartition(StringBuilder sb) {
// pass
}
/**
* Insert an entire row
*/
@Override
public void insert(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
String template = this.buildUpdateTemplate(entry);
List<?> params = this.buildUpdateForcedParams(entry);
this.insertOrUpdate(session, template, params);
}
@Override
public void delete(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
List<HugeKeys> idNames = this.idColumnName();
String template = this.deleteTemplate;
if (template == null) {
template = this.buildDeleteTemplate(idNames);
this.deleteTemplate = template;
}
PreparedStatement deleteStmt;
try {
deleteStmt = session.prepareStatement(template);
if (entry.columns().isEmpty()) {
// Delete just by id
List<Long> idValues = this.idColumnValue(entry);
assert idNames.size() == idValues.size();
for (int i = 0, n = idNames.size(); i < n; i++) {
deleteStmt.setObject(i + 1, idValues.get(i));
}
} else {
// Delete just by column keys(must be id columns)
for (int i = 0, n = idNames.size(); i < n; i++) {
HugeKeys key = idNames.get(i);
Object value = entry.column(key);
deleteStmt.setObject(i + 1, value);
}
}
} catch (SQLException e) {
throw new BackendException("Failed to prepare statement '%s'" +
"with entry columns %s",
template, entry.columns().values());
}
session.add(deleteStmt);
}
@Override
public void append(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
this.insert(session, entry);
}
@Override
public void eliminate(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
this.delete(session, entry);
}
@Override
public void updateIfPresent(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
String template = this.updateIfPresentTemplate;
if (template == null) {
template = this.buildUpdateIfPresentTemplate(entry);
this.updateIfPresentTemplate = template;
}
List<?> params = this.buildUpdateIfPresentParams(entry);
this.insertOrUpdate(session, template, params);
}
@Override
public void updateIfAbsent(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
String template = this.updateIfAbsentTemplate;
if (template == null) {
template = this.buildUpdateIfAbsentTemplate(entry);
this.updateIfAbsentTemplate = template;
}
List<?> params = this.buildUpdateIfAbsentParams(entry);
this.insertOrUpdate(session, template, params);
}
@Override
public boolean queryExist(MysqlSessions.Session session, MysqlBackendEntry.Row entry) {
Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id());
Iterator<BackendEntry> iter = this.query(session, query);
try {
return iter.hasNext();
} finally {
WrappedIterator.close(iter);
}
}
@Override
public Number queryNumber(MysqlSessions.Session session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
Iterator<Number> results = this.query(session, query, (q, rs) -> {
try {
if (!rs.resultSet().next()) {
return IteratorUtils.of(aggregate.defaultValue());
}
return IteratorUtils.of(rs.resultSet().getLong(1));
} catch (SQLException e) {
throw new BackendException(e);
} finally {
rs.close();
}
});
return aggregate.reduce(results);
}
@Override
public Iterator<BackendEntry> query(MysqlSessions.Session session, Query query) {
return this.query(session, query, this::results2Entries);
}
protected <R> Iterator<R> query(MysqlSessions.Session session, Query query,
BiFunction<Query, ResultSetWrapper,
Iterator<R>> parser) {
ExtendableIterator<R> rs = new ExtendableIterator<>();
if (query.limit() == 0L && !query.noLimit()) {
LOG.debug("Return empty result(limit=0) for query {}", query);
return rs;
}
List<StringBuilder> selections = this.query2Select(this.table(), query);
try {
for (StringBuilder selection : selections) {
ResultSetWrapper results = session.select(selection.toString());
rs.extend(parser.apply(query, results));
}
} catch (SQLException e) {
// Closing the iterator
try {
rs.close();
} catch (Exception e2) {
LOG.error("Got error {} when closing iterator for query {}", e2, query);
}
throw new BackendException("Failed to query [%s]", e, query);
}
LOG.debug("Return {} for query {}", rs, query);
return rs;
}
protected List<StringBuilder> query2Select(String table, Query query) {
// Build query
StringBuilder select = new StringBuilder(64);
select.append("SELECT ");
// Set aggregate
Aggregate aggregate = query.aggregate();
if (aggregate != null) {
select.append(aggregate.toString());
} else {
select.append("*");
}
// Set table
select.append(" FROM ").append(table);
// Is query by id?
List<StringBuilder> ids = this.queryId2Select(query, select);
List<StringBuilder> selections;
if (query.conditionsSize() == 0) {
// Query only by id
LOG.debug("Query only by id(s): {}", ids);
selections = ids;
} else {
ConditionQuery condQuery = (ConditionQuery) query;
if (condQuery.containsScanRelation()) {
assert ids.size() == 1;
return ImmutableList.of(queryByRange(condQuery, ids.get(0)));
}
selections = new ArrayList<>(ids.size());
for (StringBuilder selection : ids) {
// Query by condition
selections.addAll(this.queryCondition2Select(query, selection));
}
LOG.debug("Query by conditions: {}", selections);
}
// Set page, order-by and limit
for (StringBuilder selection : selections) {
boolean hasOrder = !query.orders().isEmpty();
if (hasOrder) {
this.wrapOrderBy(selection, query);
}
if (query.paging()) {
this.wrapPage(selection, query, false);
wrapLimit(selection, query);
} else {
if (aggregate == null && !hasOrder) {
select.append(this.orderByKeys());
}
if (!query.noLimit() || query.offset() > 0L) {
this.wrapOffset(selection, query);
}
}
}
return selections;
}
protected StringBuilder queryByRange(ConditionQuery query,
StringBuilder select) {
E.checkArgument(query.relations().size() == 1,
"Invalid scan with multi conditions: %s", query);
Condition.Relation scan = query.relations().iterator().next();
Shard shard = (Shard) scan.value();
String page = query.page();
if (MysqlShardSplitter.START.equals(shard.start()) &&
MysqlShardSplitter.END.equals(shard.end()) &&
(page == null || page.isEmpty())) {
this.wrapLimit(select, query);
return select;
}
HugeKeys partitionKey = this.idColumnName().get(0);
if (page != null && !page.isEmpty()) {
// >= page
this.wrapPage(select, query, true);
// < end
WhereBuilder where = this.newWhereBuilder(false);
if (!MysqlShardSplitter.END.equals(shard.end())) {
where.and();
where.lt(formatKey(partitionKey), shard.end());
}
select.append(where.build());
} else {
// >= start
WhereBuilder where = this.newWhereBuilder();
boolean hasStart = false;
if (!MysqlShardSplitter.START.equals(shard.start())) {
where.gte(formatKey(partitionKey), shard.start());
hasStart = true;
}
// < end
if (!MysqlShardSplitter.END.equals(shard.end())) {
if (hasStart) {
where.and();
}
where.lt(formatKey(partitionKey), shard.end());
}
select.append(where.build());
}
this.wrapLimit(select, query);
return select;
}
protected List<StringBuilder> queryId2Select(Query query,
StringBuilder select) {
// Query by id(s)
if (query.idsSize() == 0) {
return ImmutableList.of(select);
}
List<HugeKeys> nameParts = this.idColumnName();
List<List<Object>> ids = new ArrayList<>(query.idsSize());
for (Id id : query.ids()) {
List<Object> idParts = this.idColumnValue(id);
if (nameParts.size() != idParts.size()) {
throw new NotFoundException(
"Unsupported ID format: '%s' (should contain %s)",
id, nameParts);
}
ids.add(idParts);
}
// Query only by partition-key
if (nameParts.size() == 1) {
List<Object> values = new ArrayList<>(ids.size());
for (List<Object> objects : ids) {
assert objects.size() == 1;
values.add(objects.get(0));
}
WhereBuilder where = this.newWhereBuilder();
where.in(formatKey(nameParts.get(0)), values);
select.append(where.build());
return ImmutableList.of(select);
}
/*
* Query by partition-key + clustering-key
* NOTE: Error if multi-column IN clause include partition key:
* error: multi-column relations can only be applied to clustering
* columns when using: select.where(QueryBuilder.in(names, idList));
* So we use multi-query instead of IN
*/
List<StringBuilder> selections = new ArrayList<>(ids.size());
for (List<Object> objects : ids) {
assert nameParts.size() == objects.size();
StringBuilder idSelection = new StringBuilder(select);
/*
* NOTE: concat with AND relation, like:
* "pk = id and ck1 = v1 and ck2 = v2"
*/
WhereBuilder where = this.newWhereBuilder();
where.and(formatKeys(nameParts), objects);
idSelection.append(where.build());
selections.add(idSelection);
}
return selections;
}
protected List<StringBuilder> queryCondition2Select(Query query,
StringBuilder select) {
// Query by conditions
Collection<Condition> conditions = query.conditions();
List<StringBuilder> clauses = new ArrayList<>(conditions.size());
for (Condition condition : conditions) {
clauses.add(this.condition2Sql(condition));
}
WhereBuilder where = this.newWhereBuilder();
where.and(clauses);
select.append(where.build());
return ImmutableList.of(select);
}
protected StringBuilder condition2Sql(Condition condition) {
switch (condition.type()) {
case AND:
Condition.And and = (Condition.And) condition;
StringBuilder left = this.condition2Sql(and.left());
StringBuilder right = this.condition2Sql(and.right());
int size = left.length() + right.length() + " AND ".length();
StringBuilder sql = new StringBuilder(size);
sql.append(left).append(" AND ").append(right);
return sql;
case OR:
throw new BackendException("Not support OR currently");
case RELATION:
Condition.Relation r = (Condition.Relation) condition;
return this.relation2Sql(r);
default:
final String msg = "Unsupported condition: " + condition;
throw new AssertionError(msg);
}
}
protected StringBuilder relation2Sql(Condition.Relation relation) {
String key = relation.serialKey().toString();
Object value = relation.serialValue();
WhereBuilder sql = this.newWhereBuilder(false);
sql.relation(key, relation.relation(), value);
return sql.build();
}
protected WhereBuilder newWhereBuilder() {
return this.newWhereBuilder(true);
}
protected WhereBuilder newWhereBuilder(boolean startWithWhere) {
return new WhereBuilder(startWithWhere);
}
protected void wrapOrderBy(StringBuilder select, Query query) {
int size = query.orders().size();
assert size > 0;
int i = 0;
// Set order-by
select.append(" ORDER BY ");
for (Map.Entry<HugeKeys, Query.Order> order :
query.orders().entrySet()) {
String key = formatKey(order.getKey());
Query.Order value = order.getValue();
select.append(key).append(" ");
if (value == Query.Order.ASC) {
select.append("ASC");
} else {
assert value == Query.Order.DESC;
select.append("DESC");
}
if (++i != size) {
select.append(", ");
}
}
}
protected void wrapPage(StringBuilder select, Query query, boolean scan) {
String page = query.page();
// It's the first time if page is empty
if (!page.isEmpty()) {
byte[] position = PageState.fromString(page).position();
Map<HugeKeys, Object> columns = MysqlEntryIterator.PagePosition.fromBytes(position)
.columns();
List<HugeKeys> idColumnNames = this.idColumnName();
List<Object> values = new ArrayList<>(idColumnNames.size());
for (HugeKeys key : idColumnNames) {
values.add(columns.get(key));
}
// Need add `where` to `select` when query is IdQuery
boolean expectWhere = scan || query.conditionsSize() == 0;
WhereBuilder where = this.newWhereBuilder(expectWhere);
if (!expectWhere) {
where.and();
}
where.gte(formatKeys(idColumnNames), values);
select.append(where.build());
}
}
private void wrapLimit(StringBuilder select, Query query) {
select.append(this.orderByKeys());
if (!query.noLimit()) {
// Fetch `limit + 1` rows for judging whether reached the last page
select.append(" limit ");
select.append(query.limit() + 1);
}
select.append(";");
}
protected String orderByKeys() {
return Strings.EMPTY;
}
protected void wrapOffset(StringBuilder select, Query query) {
assert query.limit() >= 0;
assert query.offset() >= 0;
// Set limit and offset
select.append(" limit ");
select.append(query.limit());
select.append(" offset ");
select.append(query.offset());
select.append(";");
query.goOffset(query.offset());
}
protected Iterator<BackendEntry> results2Entries(Query query,
ResultSetWrapper results) {
return new MysqlEntryIterator(results, query, this::mergeEntries);
}
protected BackendEntry mergeEntries(BackendEntry e1, BackendEntry e2) {
// Return the next entry (not merged)
return e2;
}
public static String formatKey(HugeKeys key) {
return key.name();
}
public static HugeKeys parseKey(String name) {
return HugeKeys.valueOf(name.toUpperCase());
}
public static List<String> formatKeys(List<HugeKeys> keys) {
List<String> names = new ArrayList<>(keys.size());
for (HugeKeys key : keys) {
names.add(formatKey(key));
}
return names;
}
private static class MysqlShardSplitter extends ShardSplitter<MysqlSessions.Session> {
private static final String BASE64 =
"0123456789=?ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"abcdefghijklmnopqrstuvwxyz";
private static final int COUNT = 64;
public MysqlShardSplitter(String table) {
super(table);
}
@Override
public List<Shard> getSplits(MysqlSessions.Session session, long splitSize) {
E.checkArgument(splitSize >= MIN_SHARD_SIZE,
"The split-size must be >= %s bytes, but got %s",
MIN_SHARD_SIZE, splitSize);
List<Shard> splits = new ArrayList<>(COUNT);
splits.add(new Shard(START, BASE64.substring(0, 1), 0));
for (int i = 0; i < COUNT - 1; i++) {
splits.add(new Shard(BASE64.substring(i, i + 1),
BASE64.substring(i + 1, i + 2), 0));
}
splits.add(new Shard(BASE64.substring(COUNT - 1, COUNT), END, 0));
return splits;
}
@Override
protected long estimateDataSize(MysqlSessions.Session session) {
return 0L;
}
@Override
protected long estimateNumKeys(MysqlSessions.Session session) {
return 0L;
}
}
}