blob: 4e04580efa9849e6e8e0b51ee1bca0374d24bd36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.page;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.iterator.CIter;
import org.apache.hugegraph.iterator.Metadatable;
import org.apache.hugegraph.util.E;
public abstract class IdHolder {
protected final Query query;
protected boolean exhausted;
public IdHolder(Query query) {
E.checkNotNull(query, "query");
this.query = query;
this.exhausted = false;
}
public Query query() {
return this.query;
}
public boolean keepOrder() {
return false;
}
@Override
public String toString() {
return String.format("%s{origin:%s,final:%s}",
this.getClass().getSimpleName(),
this.query.originQuery(), this.query);
}
public abstract boolean paging();
public abstract Set<Id> all();
public abstract PageIds fetchNext(String page, long pageSize);
public static class FixedIdHolder extends IdHolder {
// Used by Joint Index
private final Set<Id> ids;
public FixedIdHolder(Query query, Set<Id> ids) {
super(query);
E.checkArgumentNotNull(ids, "The ids can't be null");
this.ids = ids;
}
@Override
public boolean paging() {
return false;
}
@Override
public Set<Id> all() {
return this.ids;
}
@Override
public PageIds fetchNext(String page, long pageSize) {
throw new NotImplementedException("FixedIdHolder.fetchNext");
}
}
public static class PagingIdHolder extends IdHolder {
private final Function<ConditionQuery, PageIds> fetcher;
public PagingIdHolder(ConditionQuery query,
Function<ConditionQuery, PageIds> fetcher) {
super(query.copy());
E.checkArgument(query.paging(),
"Query '%s' must include page info", query);
this.fetcher = fetcher;
}
@Override
public boolean paging() {
return true;
}
@Override
public PageIds fetchNext(String page, long pageSize) {
if (this.exhausted) {
return PageIds.EMPTY;
}
this.query.page(page);
this.query.limit(pageSize);
PageIds result = this.fetcher.apply((ConditionQuery) this.query);
assert result != null;
if (result.ids().size() < pageSize || result.page() == null) {
this.exhausted = true;
}
return result;
}
@Override
public Set<Id> all() {
throw new NotImplementedException("PagingIdHolder.all");
}
}
public static class BatchIdHolder extends IdHolder
implements CIter<IdHolder> {
private final Iterator<BackendEntry> entries;
private final Function<Long, Set<Id>> fetcher;
private long count;
private PageIds currentBatch;
public BatchIdHolder(ConditionQuery query,
Iterator<BackendEntry> entries,
Function<Long, Set<Id>> fetcher) {
super(query);
this.entries = entries;
this.fetcher = fetcher;
this.count = 0L;
this.currentBatch = null;
}
@Override
public boolean paging() {
return false;
}
@Override
public boolean hasNext() {
if (this.currentBatch != null) {
return true;
}
if (this.exhausted) {
return false;
}
boolean hasNext = this.entries.hasNext();
if (!hasNext) {
this.close();
}
return hasNext;
}
@Override
public IdHolder next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return this;
}
@Override
public PageIds fetchNext(String page, long batchSize) {
E.checkArgument(page == null,
"Not support page parameter by BatchIdHolder");
E.checkArgument(batchSize >= 0L,
"Invalid batch size value: %s", batchSize);
if (this.currentBatch != null) {
return this.getFromCurrentBatch(batchSize);
}
if (!this.query.noLimit()) {
long remaining = this.remaining();
if (remaining < batchSize) {
batchSize = remaining;
}
}
assert batchSize >= 0L : batchSize;
Set<Id> ids = this.fetcher.apply(batchSize);
int size = ids.size();
this.count += size;
if (size < batchSize || size == 0) {
this.close();
}
// If there is no data, the entries is not a Metadatable object
if (size == 0) {
return PageIds.EMPTY;
} else {
return new PageIds(ids, PageState.EMPTY);
}
}
@Override
public Set<Id> all() {
try {
Set<Id> ids = this.fetcher.apply(this.remaining());
if (this.currentBatch != null) {
ids.addAll(this.getFromCurrentBatch(Query.NO_LIMIT).ids());
}
this.count += ids.size();
return ids;
} finally {
this.close();
}
}
public PageIds peekNext(long size) {
E.checkArgument(this.currentBatch == null,
"Can't call peekNext() twice");
this.currentBatch = this.fetchNext(null, size);
return this.currentBatch;
}
private PageIds getFromCurrentBatch(long batchSize) {
assert this.currentBatch != null;
PageIds result = this.currentBatch;
this.currentBatch = null;
return result;
}
private long remaining() {
if (this.query.noLimit()) {
return Query.NO_LIMIT;
} else {
return this.query.total() - this.count;
}
}
@Override
public void close() {
if (this.exhausted) {
return;
}
this.exhausted = true;
CloseableIterator.closeIterator(this.entries);
}
@Override
public Object metadata(String meta, Object... args) {
E.checkState(this.entries instanceof Metadatable,
"Invalid iterator for Metadatable: %s",
this.entries.getClass());
return ((Metadatable) this.entries).metadata(meta, args);
}
}
}