blob: d3da59beca65a2a2281830baa3b1dff31dcd4b52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.range;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.toStringHelper;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link RangeTracker} for {@link ByteKey ByteKeys} in {@link ByteKeyRange ByteKeyRanges}.
*
* @see ByteKey
* @see ByteKeyRange
*/
public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
private static final Logger LOG = LoggerFactory.getLogger(ByteKeyRangeTracker.class);
/** Instantiates a new {@link ByteKeyRangeTracker} with the specified range. */
public static ByteKeyRangeTracker of(ByteKeyRange range) {
return new ByteKeyRangeTracker(range);
}
public synchronized boolean isDone() {
return done;
}
@Override
public synchronized ByteKey getStartPosition() {
return range.getStartKey();
}
@Override
public synchronized ByteKey getStopPosition() {
return range.getEndKey();
}
/** Returns the current range. */
public synchronized ByteKeyRange getRange() {
return range;
}
@Override
public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
if (done) {
return false;
}
checkState(!(position == null && !isAtSplitPoint), "The first record must be at a split point");
checkState(
!(recordStart.compareTo(range.getStartKey()) < 0),
"Trying to return record which is before the start key");
checkState(
!(position != null && recordStart.compareTo(position) < 0),
"Trying to return record which is before the last-returned record");
if (position == null) {
LOG.info(
"Adjusting range start from {} to {} as position of first returned record",
range.getStartKey(),
recordStart);
range = range.withStartKey(recordStart);
}
position = recordStart;
if (isAtSplitPoint) {
if (!range.containsKey(recordStart)) {
done = true;
return false;
}
++splitPointsSeen;
}
return true;
}
@Override
public synchronized boolean trySplitAtPosition(ByteKey splitPosition) {
// Sanity check.
if (!range.containsKey(splitPosition)) {
LOG.warn(
"{}: Rejecting split request at {} because it is not within the range.",
this,
splitPosition);
return false;
}
// Unstarted.
if (position == null) {
LOG.warn(
"{}: Rejecting split request at {} because no records have been returned.",
this,
splitPosition);
return false;
}
// Started, but not after current position.
if (splitPosition.compareTo(position) <= 0) {
LOG.warn(
"{}: Rejecting split request at {} because it is not after current position {}.",
this,
splitPosition,
position);
return false;
}
range = range.withEndKey(splitPosition);
return true;
}
@Override
public synchronized double getFractionConsumed() {
if (position == null) {
return 0;
} else if (done) {
return 1.0;
} else if (position.compareTo(range.getEndKey()) >= 0) {
return 1.0;
}
return range.estimateFractionForKey(position);
}
public synchronized long getSplitPointsConsumed() {
if (position == null) {
return 0;
} else if (isDone()) {
return splitPointsSeen;
} else {
// There is a current split point, and it has not finished processing.
checkState(
splitPointsSeen > 0,
"A started rangeTracker should have seen > 0 split points (is %s)",
splitPointsSeen);
return splitPointsSeen - 1;
}
}
///////////////////////////////////////////////////////////////////////////////
private ByteKeyRange range;
@Nullable private ByteKey position;
private long splitPointsSeen;
private boolean done;
private ByteKeyRangeTracker(ByteKeyRange range) {
this.range = range;
position = null;
splitPointsSeen = 0L;
done = false;
}
/**
* Marks this range tracker as being done. Specifically, this will mark the current split point,
* if one exists, as being finished.
*
* <p>Always returns false, so that it can be used in an implementation of {@link
* BoundedReader#start()} or {@link BoundedReader#advance()} as follows:
*
* <pre>{@code
* public boolean start() {
* return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position)
* || rangeTracker.markDone();
* }
* }</pre>
*/
public synchronized boolean markDone() {
done = true;
return false;
}
@Override
public synchronized String toString() {
return toStringHelper(ByteKeyRangeTracker.class)
.add("range", range)
.add("position", position)
.toString();
}
}