blob: 159a07a7f5105730d1b6a4c281d3522efd263bc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.memory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.serializer.TextBackendEntry;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendSession;
import org.apache.hugegraph.backend.store.BackendTable;
import org.apache.hugegraph.backend.store.Shard;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.type.HugeType;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.Aggregate.AggregateFunc;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.IdPrefixQuery;
import org.apache.hugegraph.backend.query.IdRangeQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
public class InMemoryDBTable extends BackendTable<BackendSession,
TextBackendEntry> {
private static final Logger LOG = Log.logger(InMemoryDBTable.class);
protected final Map<Id, BackendEntry> store;
private final InMemoryShardSplitter shardSplitter;
public InMemoryDBTable(HugeType type) {
super(type.name());
this.store = new ConcurrentHashMap<>();
this.shardSplitter = new InMemoryShardSplitter(this.table());
}
public InMemoryDBTable(HugeType type, Map<Id, BackendEntry> store) {
super(type.name());
this.store = store;
this.shardSplitter = new InMemoryShardSplitter(this.table());
}
@Override
protected void registerMetaHandlers() {
this.registerMetaHandler("splits", (session, meta, args) -> {
E.checkArgument(args.length == 1,
"The args count of %s must be 1", meta);
long splitSize = (long) args[0];
return this.shardSplitter.getSplits(session, splitSize);
});
}
protected Map<Id, BackendEntry> store() {
return this.store;
}
@Override
public void init(BackendSession session) {
// pass
}
@Override
public void clear(BackendSession session) {
this.store.clear();
}
@Override
public void insert(BackendSession session, TextBackendEntry entry) {
if (!this.store.containsKey(entry.id())) {
this.store.put(entry.id(), entry);
} else {
// Merge columns if the entry exists
BackendEntry origin = this.store.get(entry.id());
// TODO: Compatible with BackendEntry
origin.merge(entry);
}
}
@Override
public void delete(BackendSession session, TextBackendEntry entry) {
// Remove by id (TODO: support remove by id + condition)
this.store.remove(entry.id());
}
@Override
public void append(BackendSession session, TextBackendEntry entry) {
BackendEntry parent = this.store.get(entry.id());
if (parent == null) {
this.store.put(entry.id(), entry);
} else {
// TODO: Compatible with BackendEntry
((TextBackendEntry) parent).append(entry);
}
}
@Override
public void eliminate(BackendSession session, TextBackendEntry entry) {
BackendEntry parent = this.store.get(entry.id());
// TODO: Compatible with BackendEntry
if (parent != null) {
((TextBackendEntry) parent).eliminate(entry);
}
}
@Override
public boolean queryExist(BackendSession session, TextBackendEntry entry) {
List<Id> ids = ImmutableList.of(entry.id());
return !this.queryById(ids, this.store).isEmpty();
}
@Override
public Number queryNumber(BackendSession session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
if (aggregate.func() != AggregateFunc.COUNT) {
throw new NotSupportException(aggregate.toString());
}
assert aggregate.func() == AggregateFunc.COUNT;
Iterator<BackendEntry> results = this.query(session, query);
long total = 0L;
while (results.hasNext()) {
total += this.sizeOfBackendEntry(results.next());
}
return total;
}
@Override
public Iterator<BackendEntry> query(BackendSession session, Query query) {
String page = query.page();
if (page != null && !page.isEmpty()) {
throw new NotSupportException("paging by InMemoryDBStore");
}
Map<Id, BackendEntry> rs = this.store;
if (query instanceof IdPrefixQuery) {
IdPrefixQuery pq = (IdPrefixQuery) query;
rs = this.queryByIdPrefix(pq.start(), pq.inclusiveStart(),
pq.prefix(), rs);
}
if (query instanceof IdRangeQuery) {
IdRangeQuery rq = (IdRangeQuery) query;
rs = this.queryByIdRange(rq.start(), rq.inclusiveStart(),
rq.end(), rq.inclusiveEnd(), rs);
}
// Query by id(s)
if (query.idsSize() > 0) {
rs = this.queryById(query.ids(), rs);
}
// Query by condition(s)
if (query.conditionsSize() > 0) {
ConditionQuery condQuery = (ConditionQuery) query;
if (condQuery.containsScanRelation()) {
return this.queryByRange(condQuery);
}
rs = this.queryByFilter(query.conditions(), rs);
}
Iterator<BackendEntry> iterator = rs.values().iterator();
long offset = query.offset() - query.actualOffset();
if (offset >= rs.size()) {
query.goOffset(rs.size());
return QueryResults.emptyIterator();
}
if (offset > 0L) {
query.goOffset(offset);
iterator = this.skipOffset(iterator, offset);
}
if (!query.noLimit() && query.total() < rs.size()) {
iterator = this.dropTails(iterator, query.limit());
}
return iterator;
}
private Iterator<BackendEntry> queryByRange(ConditionQuery query) {
E.checkArgument(query.relations().size() == 1,
"Invalid scan with multi conditions: %s", query);
Condition.Relation scan = query.relations().iterator().next();
Shard shard = (Shard) scan.value();
int start = Strings.isNullOrEmpty(shard.start()) ?
0 : Long.valueOf(shard.start()).intValue();
int end = Strings.isNullOrEmpty(shard.end()) ?
0 : Long.valueOf(shard.end()).intValue();
List<BackendEntry> rs = new ArrayList<>(end - start);
Iterator<BackendEntry> iterator = this.store.values().iterator();
int i = 0;
while (iterator.hasNext() && i++ < end) {
BackendEntry entry = iterator.next();
if (i > start) {
rs.add(entry);
}
}
return rs.iterator();
}
protected Map<Id, BackendEntry> queryById(Collection<Id> ids,
Map<Id, BackendEntry> entries) {
assert ids.size() > 0;
Map<Id, BackendEntry> rs = InsertionOrderUtil.newMap();
for (Id id : ids) {
assert !id.number();
if (entries.containsKey(id)) {
rs.put(id, entries.get(id));
}
}
return rs;
}
protected Map<Id, BackendEntry> queryByIdPrefix(Id start,
boolean inclusiveStart,
Id prefix,
Map<Id, BackendEntry> rs) {
throw new BackendException("Unsupported prefix query: " + prefix);
}
protected Map<Id, BackendEntry> queryByIdRange(Id start,
boolean inclusiveStart,
Id end,
boolean inclusiveEnd,
Map<Id, BackendEntry> rs) {
throw new BackendException("Unsupported range query: " + start);
}
protected Map<Id, BackendEntry> queryByFilter(
Collection<Condition> conditions,
Map<Id, BackendEntry> entries) {
assert conditions.size() > 0;
Map<Id, BackendEntry> rs = new HashMap<>();
LOG.trace("queryByFilter {} size = {}", this.table(), entries.size());
for (BackendEntry entry : entries.values()) {
// Query by conditions
boolean matched = true;
for (Condition c : conditions) {
if (!matchCondition(entry, c)) {
// TODO: deal with others Condition like: and, or...
matched = false;
break;
}
}
if (matched) {
rs.put(entry.id(), entry);
}
}
return rs;
}
protected Iterator<BackendEntry> skipOffset(Iterator<BackendEntry> iterator,
long offset) {
// Skip offset (TODO: maybe we can improve when adding items to rs)
for (long i = 0; i < offset && iterator.hasNext(); i++) {
iterator.next();
}
return iterator;
}
protected Iterator<BackendEntry> dropTails(Iterator<BackendEntry> iterator,
long limit) {
E.checkArgument(limit <= Integer.MAX_VALUE,
"Limit must be <= 0x7fffffff, but got '%s'", limit);
List<BackendEntry> entries = new ArrayList<>((int) limit);
for (long i = 0L; i < limit && iterator.hasNext(); i++) {
entries.add(iterator.next());
}
return entries.iterator();
}
protected long sizeOfBackendEntry(BackendEntry entry) {
return 1L;
}
private static boolean matchCondition(BackendEntry item, Condition c) {
// TODO: Compatible with BackendEntry
TextBackendEntry entry = (TextBackendEntry) item;
// Not supported by memory
if (!(c instanceof Condition.Relation)) {
throw new BackendException("Unsupported condition: " + c);
}
Condition.Relation r = (Condition.Relation) c;
String key = r.serialKey().toString();
// TODO: deal with others Relation like: <, >=, ...
if (r.relation() == Condition.RelationType.CONTAINS_KEY) {
return entry.contains(r.serialValue().toString());
} else if (r.relation() == Condition.RelationType.CONTAINS_VALUE) {
return entry.containsValue(r.serialValue().toString());
} else if (r.relation() == Condition.RelationType.EQ) {
return entry.contains(key, r.serialValue().toString());
} else if (entry.contains(key)) {
return r.test(entry.column(key));
}
return false;
}
private class InMemoryShardSplitter extends ShardSplitter<BackendSession> {
public InMemoryShardSplitter(String table) {
super(table);
}
@Override
protected long maxKey() {
return InMemoryDBTable.this.store.size();
}
@Override
protected long estimateDataSize(BackendSession session) {
return 0L;
}
@Override
protected long estimateNumKeys(BackendSession session) {
return InMemoryDBTable.this.store.size();
}
}
}