| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.context; |
| |
| import java.nio.ByteBuffer; |
| import java.security.MessageDigest; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.db.ClockAndCount; |
| import org.apache.cassandra.db.TypeSizes; |
| import org.apache.cassandra.db.compaction.CompactionManager; |
| import org.apache.cassandra.serializers.MarshalException; |
| import org.apache.cassandra.utils.*; |
| |
| /** |
| * An implementation of a partitioned counter context. |
| * |
| * A context is primarily a list of tuples (counter id, clock, count) -- called |
| * shards, with some shards flagged as global or local (with |
| * special resolution rules in merge()). |
| * |
| * The data structure has two parts: |
| * a) a header containing the lists of global and local shard indexes in the body |
| * b) a list of shards -- (counter id, logical clock, count) tuples -- (the so-called 'body' below) |
| * |
| * The exact layout is: |
| * | header | body | |
| * context : |--|------|----------| |
| * ^ ^ |
| * | list of indices in the body list (2*#elt bytes) |
| * #elt in rest of header (2 bytes) |
| * |
| * Non-negative indices refer to local shards. Global shard indices are encoded as [idx + Short.MIN_VALUE], |
| * and are thus always negative. |
| * |
| * The body layout being: |
| * |
| * body: |----|----|----|----|----|----|.... |
| * ^ ^ ^ ^ ^ ^ |
| * | | count_1 | | count_2 |
| * | clock_1 | clock_2 |
| * counterid_1 counterid_2 |
| * |
| * The rules when merging two shard with the same counter id are: |
| * - global + global = keep the shard with the highest logical clock |
| * - global + local = keep the global one |
| * - global + remote = keep the global one |
| * - local + local = sum counts (and logical clocks) |
| * - local + remote = keep the local one |
| * - remote + remote = keep the shard with the highest logical clock |
| * |
| * For a detailed description of the meaning of a local and why the merging |
| * rules work this way, see CASSANDRA-1938 - specifically the 1938_discussion |
| * attachment (doesn't cover global shards, see CASSANDRA-4775 for that). |
| */ |
| public class CounterContext |
| { |
| private static final int HEADER_SIZE_LENGTH = TypeSizes.sizeof(Short.MAX_VALUE); |
| private static final int HEADER_ELT_LENGTH = TypeSizes.sizeof(Short.MAX_VALUE); |
| private static final int CLOCK_LENGTH = TypeSizes.sizeof(Long.MAX_VALUE); |
| private static final int COUNT_LENGTH = TypeSizes.sizeof(Long.MAX_VALUE); |
| private static final int STEP_LENGTH = CounterId.LENGTH + CLOCK_LENGTH + COUNT_LENGTH; |
| |
| private static final Logger logger = LoggerFactory.getLogger(CounterContext.class); |
| |
| public static enum Relationship |
| { |
| EQUAL, GREATER_THAN, LESS_THAN, DISJOINT |
| } |
| |
| // lazy-load singleton |
| private static class LazyHolder |
| { |
| private static final CounterContext counterContext = new CounterContext(); |
| } |
| |
| public static CounterContext instance() |
| { |
| return LazyHolder.counterContext; |
| } |
| |
| /** |
| * Creates a counter context with a single global, 2.1+ shard (a result of increment). |
| */ |
| public ByteBuffer createGlobal(CounterId id, long clock, long count) |
| { |
| ContextState state = ContextState.allocate(1, 0, 0); |
| state.writeGlobal(id, clock, count); |
| return state.context; |
| } |
| |
| /** |
| * Creates a counter context with a single local shard. |
| * This is only used in a PartitionUpdate until the update has gone through |
| * CounterMutation.apply(), at which point all the local shard are replaced by |
| * global ones. In other words, local shards should never hit the disk or |
| * memtables. And we use this so that if an update statement has multiple increment |
| * of the same counter we properly add them rather than keeping only one of them. |
| * (this is also used for tests of compatibility with pre-2.1 counters) |
| */ |
| public ByteBuffer createLocal(long count) |
| { |
| ContextState state = ContextState.allocate(0, 1, 0); |
| state.writeLocal(CounterId.getLocalId(), 1L, count); |
| return state.context; |
| } |
| |
| /** |
| * Creates a counter context with a single remote shard. |
| * For use by tests of compatibility with pre-2.1 counters only. |
| */ |
| public ByteBuffer createRemote(CounterId id, long clock, long count) |
| { |
| ContextState state = ContextState.allocate(0, 0, 1); |
| state.writeRemote(id, clock, count); |
| return state.context; |
| } |
| |
| private static int headerLength(ByteBuffer context) |
| { |
| return HEADER_SIZE_LENGTH + Math.abs(context.getShort(context.position())) * HEADER_ELT_LENGTH; |
| } |
| |
| private static int compareId(ByteBuffer bb1, int pos1, ByteBuffer bb2, int pos2) |
| { |
| return ByteBufferUtil.compareSubArrays(bb1, pos1, bb2, pos2, CounterId.LENGTH); |
| } |
| |
| /** |
| * Determine the count relationship between two contexts. |
| * |
| * EQUAL: Equal set of nodes and every count is equal. |
| * GREATER_THAN: Superset of nodes and every count is equal or greater than its corollary. |
| * LESS_THAN: Subset of nodes and every count is equal or less than its corollary. |
| * DISJOINT: Node sets are not equal and/or counts are not all greater or less than. |
| * |
| * Strategy: compare node logical clocks (like a version vector). |
| * |
| * @param left counter context. |
| * @param right counter context. |
| * @return the Relationship between the contexts. |
| */ |
| public Relationship diff(ByteBuffer left, ByteBuffer right) |
| { |
| Relationship relationship = Relationship.EQUAL; |
| ContextState leftState = ContextState.wrap(left); |
| ContextState rightState = ContextState.wrap(right); |
| |
| while (leftState.hasRemaining() && rightState.hasRemaining()) |
| { |
| // compare id bytes |
| int compareId = leftState.compareIdTo(rightState); |
| if (compareId == 0) |
| { |
| long leftClock = leftState.getClock(); |
| long rightClock = rightState.getClock(); |
| long leftCount = leftState.getCount(); |
| long rightCount = rightState.getCount(); |
| |
| // advance |
| leftState.moveToNext(); |
| rightState.moveToNext(); |
| |
| // process clock comparisons |
| if (leftClock == rightClock) |
| { |
| if (leftCount != rightCount) |
| { |
| // Inconsistent shard (see the corresponding code in merge()). We return DISJOINT in this |
| // case so that it will be treated as a difference, allowing read-repair to work. |
| return Relationship.DISJOINT; |
| } |
| } |
| else if ((leftClock >= 0 && rightClock > 0 && leftClock > rightClock) |
| || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock))) |
| { |
| if (relationship == Relationship.EQUAL) |
| relationship = Relationship.GREATER_THAN; |
| else if (relationship == Relationship.LESS_THAN) |
| return Relationship.DISJOINT; |
| // relationship == Relationship.GREATER_THAN |
| } |
| else |
| { |
| if (relationship == Relationship.EQUAL) |
| relationship = Relationship.LESS_THAN; |
| else if (relationship == Relationship.GREATER_THAN) |
| return Relationship.DISJOINT; |
| // relationship == Relationship.LESS_THAN |
| } |
| } |
| else if (compareId > 0) |
| { |
| // only advance the right context |
| rightState.moveToNext(); |
| |
| if (relationship == Relationship.EQUAL) |
| relationship = Relationship.LESS_THAN; |
| else if (relationship == Relationship.GREATER_THAN) |
| return Relationship.DISJOINT; |
| // relationship == Relationship.LESS_THAN |
| } |
| else // compareId < 0 |
| { |
| // only advance the left context |
| leftState.moveToNext(); |
| |
| if (relationship == Relationship.EQUAL) |
| relationship = Relationship.GREATER_THAN; |
| else if (relationship == Relationship.LESS_THAN) |
| return Relationship.DISJOINT; |
| // relationship == Relationship.GREATER_THAN |
| } |
| } |
| |
| // check final lengths |
| if (leftState.hasRemaining()) |
| { |
| if (relationship == Relationship.EQUAL) |
| return Relationship.GREATER_THAN; |
| else if (relationship == Relationship.LESS_THAN) |
| return Relationship.DISJOINT; |
| } |
| |
| if (rightState.hasRemaining()) |
| { |
| if (relationship == Relationship.EQUAL) |
| return Relationship.LESS_THAN; |
| else if (relationship == Relationship.GREATER_THAN) |
| return Relationship.DISJOINT; |
| } |
| |
| return relationship; |
| } |
| |
| /** |
| * Return a context w/ an aggregated count for each counter id. |
| * |
| * @param left counter context. |
| * @param right counter context. |
| */ |
| public ByteBuffer merge(ByteBuffer left, ByteBuffer right) |
| { |
| boolean leftIsSuperSet = true; |
| boolean rightIsSuperSet = true; |
| |
| int globalCount = 0; |
| int localCount = 0; |
| int remoteCount = 0; |
| |
| ContextState leftState = ContextState.wrap(left); |
| ContextState rightState = ContextState.wrap(right); |
| |
| while (leftState.hasRemaining() && rightState.hasRemaining()) |
| { |
| int cmp = leftState.compareIdTo(rightState); |
| if (cmp == 0) |
| { |
| Relationship rel = compare(leftState, rightState); |
| if (rel == Relationship.GREATER_THAN) |
| rightIsSuperSet = false; |
| else if (rel == Relationship.LESS_THAN) |
| leftIsSuperSet = false; |
| else if (rel == Relationship.DISJOINT) |
| leftIsSuperSet = rightIsSuperSet = false; |
| |
| if (leftState.isGlobal() || rightState.isGlobal()) |
| globalCount += 1; |
| else if (leftState.isLocal() || rightState.isLocal()) |
| localCount += 1; |
| else |
| remoteCount += 1; |
| |
| leftState.moveToNext(); |
| rightState.moveToNext(); |
| } |
| else if (cmp > 0) |
| { |
| leftIsSuperSet = false; |
| |
| if (rightState.isGlobal()) |
| globalCount += 1; |
| else if (rightState.isLocal()) |
| localCount += 1; |
| else |
| remoteCount += 1; |
| |
| rightState.moveToNext(); |
| } |
| else // cmp < 0 |
| { |
| rightIsSuperSet = false; |
| |
| if (leftState.isGlobal()) |
| globalCount += 1; |
| else if (leftState.isLocal()) |
| localCount += 1; |
| else |
| remoteCount += 1; |
| |
| leftState.moveToNext(); |
| } |
| } |
| |
| if (leftState.hasRemaining()) |
| rightIsSuperSet = false; |
| else if (rightState.hasRemaining()) |
| leftIsSuperSet = false; |
| |
| // if one of the contexts is a superset, return it early. |
| if (leftIsSuperSet) |
| return left; |
| else if (rightIsSuperSet) |
| return right; |
| |
| while (leftState.hasRemaining()) |
| { |
| if (leftState.isGlobal()) |
| globalCount += 1; |
| else if (leftState.isLocal()) |
| localCount += 1; |
| else |
| remoteCount += 1; |
| |
| leftState.moveToNext(); |
| } |
| |
| while (rightState.hasRemaining()) |
| { |
| if (rightState.isGlobal()) |
| globalCount += 1; |
| else if (rightState.isLocal()) |
| localCount += 1; |
| else |
| remoteCount += 1; |
| |
| rightState.moveToNext(); |
| } |
| |
| leftState.reset(); |
| rightState.reset(); |
| |
| return merge(ContextState.allocate(globalCount, localCount, remoteCount), leftState, rightState); |
| } |
| |
| private ByteBuffer merge(ContextState mergedState, ContextState leftState, ContextState rightState) |
| { |
| while (leftState.hasRemaining() && rightState.hasRemaining()) |
| { |
| int cmp = leftState.compareIdTo(rightState); |
| if (cmp == 0) |
| { |
| Relationship rel = compare(leftState, rightState); |
| if (rel == Relationship.DISJOINT) // two local shards |
| mergedState.writeLocal(leftState.getCounterId(), |
| leftState.getClock() + rightState.getClock(), |
| leftState.getCount() + rightState.getCount()); |
| else if (rel == Relationship.GREATER_THAN) |
| leftState.copyTo(mergedState); |
| else // EQUAL or LESS_THAN |
| rightState.copyTo(mergedState); |
| |
| rightState.moveToNext(); |
| leftState.moveToNext(); |
| } |
| else if (cmp > 0) |
| { |
| rightState.copyTo(mergedState); |
| rightState.moveToNext(); |
| } |
| else // cmp < 0 |
| { |
| leftState.copyTo(mergedState); |
| leftState.moveToNext(); |
| } |
| } |
| |
| while (leftState.hasRemaining()) |
| { |
| leftState.copyTo(mergedState); |
| leftState.moveToNext(); |
| } |
| |
| while (rightState.hasRemaining()) |
| { |
| rightState.copyTo(mergedState); |
| rightState.moveToNext(); |
| } |
| |
| return mergedState.context; |
| } |
| |
| /* |
| * Compares two shards, returns: |
| * - GREATER_THAN if leftState overrides rightState |
| * - LESS_THAN if rightState overrides leftState |
| * - EQUAL for two equal, non-local, shards |
| * - DISJOINT for any two local shards |
| */ |
| private Relationship compare(ContextState leftState, ContextState rightState) |
| { |
| long leftClock = leftState.getClock(); |
| long leftCount = leftState.getCount(); |
| long rightClock = rightState.getClock(); |
| long rightCount = rightState.getCount(); |
| |
| if (leftState.isGlobal() || rightState.isGlobal()) |
| { |
| if (leftState.isGlobal() && rightState.isGlobal()) |
| { |
| if (leftClock == rightClock) |
| { |
| // Can happen if an sstable gets lost and disk failure policy is set to 'best effort' |
| if (leftCount != rightCount && CompactionManager.isCompactionManager.get()) |
| { |
| logger.warn("invalid global counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in " |
| + "count; will pick highest to self-heal on compaction", |
| leftState.getCounterId(), leftClock, leftCount, |
| rightState.getCounterId(), rightClock, rightCount); |
| } |
| |
| if (leftCount > rightCount) |
| return Relationship.GREATER_THAN; |
| else if (leftCount == rightCount) |
| return Relationship.EQUAL; |
| else |
| return Relationship.LESS_THAN; |
| } |
| else |
| { |
| return leftClock > rightClock ? Relationship.GREATER_THAN : Relationship.LESS_THAN; |
| } |
| } |
| else // only one is global - keep that one |
| { |
| return leftState.isGlobal() ? Relationship.GREATER_THAN : Relationship.LESS_THAN; |
| } |
| } |
| |
| if (leftState.isLocal() || rightState.isLocal()) |
| { |
| // Local id and at least one is a local shard. |
| if (leftState.isLocal() && rightState.isLocal()) |
| return Relationship.DISJOINT; |
| else // only one is local - keep that one |
| return leftState.isLocal() ? Relationship.GREATER_THAN : Relationship.LESS_THAN; |
| } |
| |
| // both are remote shards |
| if (leftClock == rightClock) |
| { |
| // We should never see non-local shards w/ same id+clock but different counts. However, if we do |
| // we should "heal" the problem by being deterministic in our selection of shard - and |
| // log the occurrence so that the operator will know something is wrong. |
| if (leftCount != rightCount && CompactionManager.isCompactionManager.get()) |
| { |
| logger.warn("invalid remote counter shard detected; ({}, {}, {}) and ({}, {}, {}) differ only in " |
| + "count; will pick highest to self-heal on compaction", |
| leftState.getCounterId(), leftClock, leftCount, |
| rightState.getCounterId(), rightClock, rightCount); |
| } |
| |
| if (leftCount > rightCount) |
| return Relationship.GREATER_THAN; |
| else if (leftCount == rightCount) |
| return Relationship.EQUAL; |
| else |
| return Relationship.LESS_THAN; |
| } |
| else |
| { |
| if ((leftClock >= 0 && rightClock > 0 && leftClock >= rightClock) |
| || (leftClock < 0 && (rightClock > 0 || leftClock < rightClock))) |
| return Relationship.GREATER_THAN; |
| else |
| return Relationship.LESS_THAN; |
| } |
| } |
| |
| /** |
| * Human-readable String from context. |
| * |
| * @param context counter context. |
| * @return a human-readable String of the context. |
| */ |
| public String toString(ByteBuffer context) |
| { |
| ContextState state = ContextState.wrap(context); |
| StringBuilder sb = new StringBuilder(); |
| sb.append("["); |
| |
| while (state.hasRemaining()) |
| { |
| if (state.getElementIndex() > 0) |
| sb.append(","); |
| sb.append("{"); |
| sb.append(state.getCounterId()).append(", "); |
| sb.append(state.getClock()).append(", "); |
| sb.append(state.getCount()); |
| sb.append("}"); |
| if (state.isGlobal()) |
| sb.append("$"); |
| else if (state.isLocal()) |
| sb.append("*"); |
| state.moveToNext(); |
| } |
| |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| /** |
| * Returns the aggregated count across all counter ids. |
| * |
| * @param context a counter context |
| * @return the aggregated count represented by {@code context} |
| */ |
| public long total(ByteBuffer context) |
| { |
| long total = 0L; |
| // we could use a ContextState but it is easy enough that we avoid the object creation |
| for (int offset = context.position() + headerLength(context); offset < context.limit(); offset += STEP_LENGTH) |
| total += context.getLong(offset + CounterId.LENGTH + CLOCK_LENGTH); |
| return total; |
| } |
| |
| public boolean shouldClearLocal(ByteBuffer context) |
| { |
| // #elt being negative means we have to clean local shards. |
| return context.getShort(context.position()) < 0; |
| } |
| |
| /** |
| * Detects whether or not the context has any legacy (local or remote) shards in it. |
| */ |
| public boolean hasLegacyShards(ByteBuffer context) |
| { |
| int totalCount = (context.remaining() - headerLength(context)) / STEP_LENGTH; |
| int localAndGlobalCount = Math.abs(context.getShort(context.position())); |
| |
| if (localAndGlobalCount < totalCount) |
| return true; // remote shard(s) present |
| |
| for (int i = 0; i < localAndGlobalCount; i++) |
| if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH) >= 0) |
| return true; // found a local shard |
| |
| return false; |
| } |
| |
| /** |
| * Mark context to delete local references afterward. |
| * Marking is done by multiply #elt by -1 to preserve header length |
| * and #elt count in order to clear all local refs later. |
| * |
| * @param context a counter context |
| * @return context that marked to delete local refs |
| */ |
| public ByteBuffer markLocalToBeCleared(ByteBuffer context) |
| { |
| short count = context.getShort(context.position()); |
| if (count <= 0) |
| return context; // already marked or all are remote. |
| |
| boolean hasLocalShards = false; |
| for (int i = 0; i < count; i++) |
| { |
| if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH) >= 0) |
| { |
| hasLocalShards = true; |
| break; |
| } |
| } |
| |
| if (!hasLocalShards) |
| return context; // all shards are global or remote. |
| |
| ByteBuffer marked = ByteBuffer.allocate(context.remaining()); |
| marked.putShort(marked.position(), (short) (count * -1)); |
| ByteBufferUtil.arrayCopy(context, |
| context.position() + HEADER_SIZE_LENGTH, |
| marked, |
| marked.position() + HEADER_SIZE_LENGTH, |
| context.remaining() - HEADER_SIZE_LENGTH); |
| return marked; |
| } |
| |
| /** |
| * Remove all the local of a context (but keep global). |
| * |
| * @param context a counter context |
| * @return a version of {@code context} where no shards are local. |
| */ |
| public ByteBuffer clearAllLocal(ByteBuffer context) |
| { |
| int count = Math.abs(context.getShort(context.position())); |
| if (count == 0) |
| return context; // no local or global shards present. |
| |
| List<Short> globalShardIndexes = new ArrayList<>(count); |
| for (int i = 0; i < count; i++) |
| { |
| short elt = context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH); |
| if (elt < 0) |
| globalShardIndexes.add(elt); |
| } |
| |
| if (count == globalShardIndexes.size()) |
| return context; // no local shards detected. |
| |
| // allocate a smaller BB for the cleared context - with no local header elts. |
| ByteBuffer cleared = ByteBuffer.allocate(context.remaining() - (count - globalShardIndexes.size()) * HEADER_ELT_LENGTH); |
| |
| cleared.putShort(cleared.position(), (short) globalShardIndexes.size()); |
| for (int i = 0; i < globalShardIndexes.size(); i++) |
| cleared.putShort(cleared.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH, globalShardIndexes.get(i)); |
| |
| int origHeaderLength = headerLength(context); |
| ByteBufferUtil.arrayCopy(context, |
| context.position() + origHeaderLength, |
| cleared, |
| cleared.position() + headerLength(cleared), |
| context.remaining() - origHeaderLength); |
| |
| return cleared; |
| } |
| |
| public void validateContext(ByteBuffer context) throws MarshalException |
| { |
| if ((context.remaining() - headerLength(context)) % STEP_LENGTH != 0) |
| throw new MarshalException("Invalid size for a counter context"); |
| } |
| |
| /** |
| * Update a MessageDigest with the content of a context. |
| * Note that this skips the header entirely since the header information |
| * has local meaning only, while digests are meant for comparison across |
| * nodes. This means in particular that we always have: |
| * updateDigest(ctx) == updateDigest(clearAllLocal(ctx)) |
| */ |
| public void updateDigest(MessageDigest message, ByteBuffer context) |
| { |
| ByteBuffer dup = context.duplicate(); |
| dup.position(context.position() + headerLength(context)); |
| message.update(dup); |
| } |
| |
| /** |
| * Returns the clock and the count associated with the local counter id, or (0, 0) if no such shard is present. |
| */ |
| public ClockAndCount getLocalClockAndCount(ByteBuffer context) |
| { |
| return getClockAndCountOf(context, CounterId.getLocalId()); |
| } |
| |
| /** |
| * Returns the count associated with the local counter id, or 0 if no such shard is present. |
| */ |
| public long getLocalCount(ByteBuffer context) |
| { |
| return getLocalClockAndCount(context).count; |
| } |
| |
| /** |
| * Checks if a context is local |
| */ |
| public boolean isLocal(ByteBuffer context) |
| { |
| return ContextState.wrap(context).isLocal(); |
| } |
| |
| /** |
| * Returns the clock and the count associated with the given counter id, or (0, 0) if no such shard is present. |
| */ |
| @VisibleForTesting |
| public ClockAndCount getClockAndCountOf(ByteBuffer context, CounterId id) |
| { |
| int position = findPositionOf(context, id); |
| if (position == -1) |
| return ClockAndCount.BLANK; |
| |
| long clock = context.getLong(position + CounterId.LENGTH); |
| long count = context.getLong(position + CounterId.LENGTH + CLOCK_LENGTH); |
| return ClockAndCount.create(clock, count); |
| } |
| |
| /** |
| * Finds the position of a shard with the given id within the context (via binary search). |
| */ |
| @VisibleForTesting |
| public int findPositionOf(ByteBuffer context, CounterId id) |
| { |
| int headerLength = headerLength(context); |
| int offset = context.position() + headerLength; |
| |
| int left = 0; |
| int right = (context.remaining() - headerLength) / STEP_LENGTH - 1; |
| |
| while (right >= left) |
| { |
| int middle = (left + right) / 2; |
| int cmp = compareId(context, offset + middle * STEP_LENGTH, id.bytes(), id.bytes().position()); |
| |
| if (cmp == -1) |
| left = middle + 1; |
| else if (cmp == 0) |
| return offset + middle * STEP_LENGTH; |
| else |
| right = middle - 1; |
| } |
| |
| return -1; // position not found |
| } |
| |
| /** |
| * Helper class to work on contexts (works by iterating over them). |
| * A context being abstractly a list of tuple (counterid, clock, count), a |
| * ContextState encapsulate a context and a position to one of the tuple. |
| * It also allow to create new context iteratively. |
| * |
| * Note: this is intrinsically a private class intended for use by the |
| * methods of CounterContext only. It is however public because it is |
| * convenient to create handcrafted context for unit tests. |
| */ |
| public static class ContextState |
| { |
| public final ByteBuffer context; |
| public final int headerLength; |
| |
| private int headerOffset; // offset from context.position() |
| private int bodyOffset; // offset from context.position() |
| private boolean currentIsGlobal; |
| private boolean currentIsLocal; |
| |
| private ContextState(ByteBuffer context) |
| { |
| this.context = context; |
| this.headerLength = this.bodyOffset = headerLength(context); |
| this.headerOffset = HEADER_SIZE_LENGTH; |
| updateIsGlobalOrLocal(); |
| } |
| |
| public static ContextState wrap(ByteBuffer context) |
| { |
| return new ContextState(context); |
| } |
| |
| /** |
| * Allocate a new context big enough for globalCount + localCount + remoteCount elements |
| * and return the initial corresponding ContextState. |
| */ |
| public static ContextState allocate(int globalCount, int localCount, int remoteCount) |
| { |
| int headerLength = HEADER_SIZE_LENGTH + (globalCount + localCount) * HEADER_ELT_LENGTH; |
| int bodyLength = (globalCount + localCount + remoteCount) * STEP_LENGTH; |
| |
| ByteBuffer buffer = ByteBuffer.allocate(headerLength + bodyLength); |
| buffer.putShort(buffer.position(), (short) (globalCount + localCount)); |
| |
| return ContextState.wrap(buffer); |
| } |
| |
| public boolean isGlobal() |
| { |
| return currentIsGlobal; |
| } |
| |
| public boolean isLocal() |
| { |
| return currentIsLocal; |
| } |
| |
| public boolean isRemote() |
| { |
| return !(currentIsGlobal || currentIsLocal); |
| } |
| |
| private void updateIsGlobalOrLocal() |
| { |
| if (headerOffset >= headerLength) |
| { |
| currentIsGlobal = currentIsLocal = false; |
| } |
| else |
| { |
| short headerElt = context.getShort(context.position() + headerOffset); |
| currentIsGlobal = headerElt == getElementIndex() + Short.MIN_VALUE; |
| currentIsLocal = headerElt == getElementIndex(); |
| } |
| } |
| |
| public boolean hasRemaining() |
| { |
| return bodyOffset < context.remaining(); |
| } |
| |
| public void moveToNext() |
| { |
| bodyOffset += STEP_LENGTH; |
| if (currentIsGlobal || currentIsLocal) |
| headerOffset += HEADER_ELT_LENGTH; |
| updateIsGlobalOrLocal(); |
| } |
| |
| public void copyTo(ContextState other) |
| { |
| other.writeElement(getCounterId(), getClock(), getCount(), currentIsGlobal, currentIsLocal); |
| } |
| |
| public int compareIdTo(ContextState other) |
| { |
| return compareId(context, context.position() + bodyOffset, other.context, other.context.position() + other.bodyOffset); |
| } |
| |
| public void reset() |
| { |
| this.headerOffset = HEADER_SIZE_LENGTH; |
| this.bodyOffset = headerLength; |
| updateIsGlobalOrLocal(); |
| } |
| |
| public int getElementIndex() |
| { |
| return (bodyOffset - headerLength) / STEP_LENGTH; |
| } |
| |
| public CounterId getCounterId() |
| { |
| return CounterId.wrap(context, context.position() + bodyOffset); |
| } |
| |
| public long getClock() |
| { |
| return context.getLong(context.position() + bodyOffset + CounterId.LENGTH); |
| } |
| |
| public long getCount() |
| { |
| return context.getLong(context.position() + bodyOffset + CounterId.LENGTH + CLOCK_LENGTH); |
| } |
| |
| public void writeGlobal(CounterId id, long clock, long count) |
| { |
| writeElement(id, clock, count, true, false); |
| } |
| |
| // In 2.1 only used by the unit tests. |
| public void writeLocal(CounterId id, long clock, long count) |
| { |
| writeElement(id, clock, count, false, true); |
| } |
| |
| // In 2.1 only used by the unit tests. |
| public void writeRemote(CounterId id, long clock, long count) |
| { |
| writeElement(id, clock, count, false, false); |
| } |
| |
| private void writeElement(CounterId id, long clock, long count, boolean isGlobal, boolean isLocal) |
| { |
| writeElementAtOffset(context, context.position() + bodyOffset, id, clock, count); |
| |
| if (isGlobal) |
| context.putShort(context.position() + headerOffset, (short) (getElementIndex() + Short.MIN_VALUE)); |
| else if (isLocal) |
| context.putShort(context.position() + headerOffset, (short) getElementIndex()); |
| |
| currentIsGlobal = isGlobal; |
| currentIsLocal = isLocal; |
| moveToNext(); |
| } |
| |
| // write a tuple (counter id, clock, count) at an absolute (bytebuffer-wise) offset |
| private void writeElementAtOffset(ByteBuffer ctx, int offset, CounterId id, long clock, long count) |
| { |
| ctx = ctx.duplicate(); |
| ctx.position(offset); |
| ctx.put(id.bytes().duplicate()); |
| ctx.putLong(clock); |
| ctx.putLong(count); |
| } |
| } |
| } |