blob: f13798dd639e99abe6335025ff6572aab781b20c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.backend.store;
import java.util.NoSuchElementException;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.page.PageInfo;
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.exception.LimitExceedException;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.iterator.CIter;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
public abstract class BackendEntryIterator implements CIter<BackendEntry> {
private static final Logger LOG = Log.logger(BackendEntryIterator.class);
public static final long INLINE_BATCH_SIZE = Query.COMMIT_BATCH;
protected final Query query;
protected BackendEntry current;
private long count;
public BackendEntryIterator(Query query) {
E.checkNotNull(query, "query");
this.query = query;
this.count = 0L;
this.current = null;
}
@Override
public boolean hasNext() {
if (this.reachLimit()) {
return false;
}
if (this.current != null) {
return true;
}
return this.fetch();
}
@Override
public BackendEntry next() {
// Stop if reach capacity
this.checkCapacity();
// Stop if reach limit
if (this.reachLimit()) {
throw new NoSuchElementException();
}
if (this.current == null) {
this.fetch();
}
BackendEntry current = this.current;
if (current == null) {
throw new NoSuchElementException();
}
this.current = null;
this.count += this.sizeOf(current);
return current;
}
@Override
public Object metadata(String meta, Object... args) {
if (PageInfo.PAGE.equals(meta)) {
return this.pageState();
}
throw new NotSupportException("Invalid meta '%s'", meta);
}
public static final void checkInterrupted() {
if (Thread.interrupted()) {
throw new BackendException("Interrupted, maybe it is timed out",
new InterruptedException());
}
}
protected final void checkCapacity() throws LimitExceedException {
// Stop if reach capacity
this.query.checkCapacity(this.count());
}
protected final boolean reachLimit() {
/*
* TODO: if the query is separated with multi sub-queries(like query
* id in [id1, id2, ...]), then each BackendEntryIterator is only
* result(s) of one sub-query, so the query offset/limit is inaccurate.
*/
// Stop if it has reached limit after the previous next()
return this.reachLimit(this.count);
}
protected final boolean reachLimit(long count) {
try {
checkInterrupted();
} catch (Throwable e) {
try {
this.close();
} catch (Throwable ex) {
LOG.warn("Failed to close backend entry iterator for interrupted query", ex);
}
throw e;
}
return this.query.reachLimit(count);
}
protected final long count() {
return this.count;
}
protected final long fetched() {
long ccount = this.current == null ? 0 : this.sizeOf(this.current);
return this.count + ccount;
}
protected final void skipPageOffset(String page) {
PageState pageState = PageState.fromString(page);
int pageOffset = pageState.offset();
if (pageOffset > 0) {
/*
* Don't update this.count even if skipped page offset,
* because the skipped records belongs to the last page.
*/
this.skipOffset(pageOffset);
}
}
protected void skipOffset() {
long offset = this.query.offset() - this.query.actualOffset();
if (offset <= 0L) {
return;
}
long skipped = this.skipOffset(offset);
this.count += skipped;
this.query.goOffset(skipped);
}
protected long skipOffset(long offset) {
assert offset >= 0L;
long skipped = 0L;
// Skip offset
while (skipped < offset && this.fetch()) {
assert this.current != null;
final long size = this.sizeOf(this.current);
skipped += size;
if (skipped > offset) {
// Skip part of sub-items in an entry
final long skip = size - (skipped - offset);
skipped -= this.skip(this.current, skip);
assert skipped == offset;
} else {
// Skip entry
this.current = null;
}
}
return skipped;
}
protected long sizeOf(BackendEntry entry) {
return 1;
}
protected long skip(BackendEntry entry, long skip) {
assert this.sizeOf(entry) == 1;
// Return the remained sub-items(items)
return this.sizeOf(entry);
}
protected abstract boolean fetch();
protected abstract PageState pageState();
public static final class EmptyIterator extends BackendEntryIterator {
public EmptyIterator(Query query) {
super(query);
}
@Override
protected boolean fetch() {
return false;
}
@Override
protected PageState pageState() {
return PageState.EMPTY;
}
@Override
public void close() throws Exception {
}
}
}