/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.compaction;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.Test;

import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.utils.concurrent.Transactional;

import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.apache.cassandra.Util.assertOnDiskState;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;


public class AntiCompactionTest
{
    private static final String KEYSPACE1 = "AntiCompactionTest";
    private static final String CF = "AntiCompactionTest";
    private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();

    private static TableMetadata metadata;
    private static ColumnFamilyStore cfs;
    private static InetAddressAndPort local;


    @BeforeClass
    public static void defineSchema() throws Throwable
    {
        SchemaLoader.prepareServer();
        metadata = SchemaLoader.standardCFMD(KEYSPACE1, CF).build();
        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata);
        cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
        local = InetAddressAndPort.getByName("127.0.0.1");
    }

    @After
    public void truncateCF()
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.truncateBlocking();
    }

    private void registerParentRepairSession(UUID sessionID, Iterable<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
    {
        ActiveRepairService.instance.registerParentRepairSession(sessionID,
                                                                 InetAddressAndPort.getByName("10.0.0.1"),
                                                                 Lists.newArrayList(cfs), ImmutableSet.copyOf(ranges),
                                                                 pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE,
                                                                 repairedAt, true, PreviewKind.NONE);
    }

    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
    {
        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
        for (Range<Token> range : full)
            builder.add(new Replica(local, range, true));

        for (Range<Token> range : trans)
            builder.add(new Replica(local, range, false));

        return builder.build();
    }

    private static Collection<Range<Token>> range(int l, int r)
    {
        return Collections.singleton(new Range<>(new BytesToken(Integer.toString(l).getBytes()), new BytesToken(Integer.toString(r).getBytes())));
    }

    private static class SSTableStats
    {
        int numLiveSSTables = 0;
        int pendingKeys = 0;
        int transKeys = 0;
        int unrepairedKeys = 0;
    }

    private SSTableStats antiCompactRanges(ColumnFamilyStore store, RangesAtEndpoint ranges) throws IOException
    {
        UUID sessionID = UUID.randomUUID();
        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
             Refs<SSTableReader> refs = Refs.ref(sstables))
        {
            if (txn == null)
                throw new IllegalStateException();
            registerParentRepairSession(sessionID, ranges.ranges(), FBUtilities.nowInSeconds(), sessionID);
            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, sessionID, () -> false);
        }

        SSTableStats stats = new SSTableStats();
        stats.numLiveSSTables = store.getLiveSSTables().size();

        Predicate<Token> fullContains = t -> Iterables.any(ranges.onlyFull().ranges(), r -> r.contains(t));
        Predicate<Token> transContains = t -> Iterables.any(ranges.onlyTransient().ranges(), r -> r.contains(t));
        for (SSTableReader sstable : store.getLiveSSTables())
        {
            assertFalse(sstable.isRepaired());
            assertEquals(sstable.isPendingRepair() ? sessionID : NO_PENDING_REPAIR, sstable.getPendingRepair());
            try (ISSTableScanner scanner = sstable.getScanner())
            {
                while (scanner.hasNext())
                {
                    UnfilteredRowIterator row = scanner.next();
                    Token token = row.partitionKey().getToken();
                    if (sstable.isPendingRepair() && !sstable.isTransient())
                    {
                        assertTrue(fullContains.test(token));
                        assertFalse(transContains.test(token));
                        stats.pendingKeys++;
                    }
                    else if (sstable.isPendingRepair() && sstable.isTransient())
                    {

                        assertTrue(transContains.test(token));
                        assertFalse(fullContains.test(token));
                        stats.transKeys++;
                    }
                    else
                    {
                        assertFalse(fullContains.test(token));
                        assertFalse(transContains.test(token));
                        stats.unrepairedKeys++;
                    }
                }
            }
        }
        for (SSTableReader sstable : store.getLiveSSTables())
        {
            assertFalse(sstable.isMarkedCompacted());
            assertEquals(1, sstable.selfRef().globalCount());
        }
        assertEquals(0, store.getTracker().getCompacting().size());
        return stats;
    }

    @Test
    public void antiCompactOneFull() throws Exception
    {
        ColumnFamilyStore store = prepareColumnFamilyStore();
        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES));
        assertEquals(2, stats.numLiveSSTables);
        assertEquals(stats.pendingKeys, 4);
        assertEquals(stats.transKeys, 0);
        assertEquals(stats.unrepairedKeys, 6);
        assertOnDiskState(store, 2);
    }

    @Test
    public void antiCompactOneMixed() throws Exception
    {
        ColumnFamilyStore store = prepareColumnFamilyStore();
        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
        assertEquals(3, stats.numLiveSSTables);
        assertEquals(stats.pendingKeys, 4);
        assertEquals(stats.transKeys, 4);
        assertEquals(stats.unrepairedKeys, 2);
        assertOnDiskState(store, 3);
    }

    @Test
    public void antiCompactOneTransOnly() throws Exception
    {
        ColumnFamilyStore store = prepareColumnFamilyStore();
        SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
        assertEquals(2, stats.numLiveSSTables);
        assertEquals(stats.pendingKeys, 0);
        assertEquals(stats.transKeys, 4);
        assertEquals(stats.unrepairedKeys, 6);
        assertOnDiskState(store, 2);
    }

    @Test
    public void antiCompactionSizeTest() throws InterruptedException, IOException
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
        cfs.disableAutoCompaction();
        SSTableReader s = writeFile(cfs, 1000);
        cfs.addSSTable(s);
        Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
        List<Range<Token>> ranges = Arrays.asList(range);
        Collection<SSTableReader> sstables = cfs.getLiveSSTables();
        UUID parentRepairSession = UUID.randomUUID();
        registerParentRepairSession(parentRepairSession, ranges, UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
             Refs<SSTableReader> refs = Refs.ref(sstables))
        {
            CompactionManager.instance.performAnticompaction(cfs, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession, () -> false);
        }
        long sum = 0;
        long rows = 0;
        for (SSTableReader x : cfs.getLiveSSTables())
        {
            sum += x.bytesOnDisk();
            rows += x.getTotalRows();
        }
        assertEquals(sum, cfs.metric.liveDiskSpaceUsed.getCount());
        assertEquals(rows, 1000 * (1000 * 5));//See writeFile for how this number is derived
        assertOnDiskState(cfs, 2);
    }

    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
    {
        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
        Descriptor desc = cfs.newSSTableDescriptor(dir);

        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
        {
            for (int i = 0; i < count; i++)
            {
                UpdateBuilder builder = UpdateBuilder.create(metadata, ByteBufferUtil.bytes(i));
                for (int j = 0; j < count * 5; j++)
                    builder.newRow("c" + j).add("val", "value1");
                writer.append(builder.build().unfilteredIterator());

            }
            Collection<SSTableReader> sstables = writer.finish(true);
            assertNotNull(sstables);
            assertEquals(1, sstables.size());
            return sstables.iterator().next();
        }
    }

    public void generateSStable(ColumnFamilyStore store, String Suffix)
    {
        for (int i = 0; i < 10; i++)
        {
            String localSuffix = Integer.toString(i);
            new RowUpdateBuilder(metadata, System.currentTimeMillis(), localSuffix + "-" + Suffix)
                    .clustering("c")
                    .add("val", "val" + localSuffix)
                    .build()
                    .applyUnsafe();
        }
        store.forceBlockingFlush();
    }

    @Test
    public void antiCompactTenFull() throws InterruptedException, IOException
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.disableAutoCompaction();

        for (int table = 0; table < 10; table++)
        {
            generateSStable(store,Integer.toString(table));
        }
        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES));
        /*
        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
        so there will be no net change in the number of sstables
         */
        assertEquals(10, stats.numLiveSSTables);
        assertEquals(stats.pendingKeys, 40);
        assertEquals(stats.transKeys, 0);
        assertEquals(stats.unrepairedKeys, 60);
        assertOnDiskState(store, 10);
    }

    @Test
    public void antiCompactTenTrans() throws InterruptedException, IOException
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.disableAutoCompaction();

        for (int table = 0; table < 10; table++)
        {
            generateSStable(store,Integer.toString(table));
        }
        SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
        /*
        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
        so there will be no net change in the number of sstables
         */
        assertEquals(10, stats.numLiveSSTables);
        assertEquals(stats.pendingKeys, 0);
        assertEquals(stats.transKeys, 40);
        assertEquals(stats.unrepairedKeys, 60);
        assertOnDiskState(store, 10);
    }

    @Test
    public void antiCompactTenMixed() throws InterruptedException, IOException
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.disableAutoCompaction();

        for (int table = 0; table < 10; table++)
        {
            generateSStable(store,Integer.toString(table));
        }
        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
        assertEquals(15, stats.numLiveSSTables);
        assertEquals(stats.pendingKeys, 40);
        assertEquals(stats.transKeys, 40);
        assertEquals(stats.unrepairedKeys, 20);
        assertOnDiskState(store, 15);
    }

    @Test
    public void shouldMutatePendingRepair() throws InterruptedException, IOException
    {
        ColumnFamilyStore store = prepareColumnFamilyStore();
        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
        assertEquals(store.getLiveSSTables().size(), sstables.size());
        // the sstables start at "0".getBytes() = 48, we need to include that first token, with "/".getBytes() = 47
        Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("9999".getBytes()));
        List<Range<Token>> ranges = Arrays.asList(range);
        UUID pendingRepair = UUID.randomUUID();
        registerParentRepairSession(pendingRepair, ranges, UNREPAIRED_SSTABLE, pendingRepair);

        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
             Refs<SSTableReader> refs = Refs.ref(sstables))
        {
            CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, pendingRepair, () -> false);
        }

        assertThat(store.getLiveSSTables().size(), is(1));
        assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false));
        assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(true));
        assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
        assertThat(store.getTracker().getCompacting().size(), is(0));
        assertOnDiskState(store, 1);
    }

    @Test
    public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.disableAutoCompaction();

        for (int table = 0; table < 10; table++)
        {
            generateSStable(store,Integer.toString(table));
        }
        int refCountBefore = Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount();
        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
        assertEquals(store.getLiveSSTables().size(), sstables.size());

        Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes()));
        List<Range<Token>> ranges = Arrays.asList(range);
        UUID parentRepairSession = UUID.randomUUID();
        registerParentRepairSession(parentRepairSession, ranges, UNREPAIRED_SSTABLE, null);
        boolean gotException = false;
        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
             Refs<SSTableReader> refs = Refs.ref(sstables))
        {
            CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession, () -> false);
        }
        catch (IllegalStateException e)
        {
            gotException = true;
        }

        assertTrue(gotException);
        assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false));
        assertEquals(refCountBefore, Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount());
        assertOnDiskState(cfs, 10);
    }

    private ColumnFamilyStore prepareColumnFamilyStore()
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.disableAutoCompaction();
        for (int i = 0; i < 10; i++)
        {
            new RowUpdateBuilder(metadata, System.currentTimeMillis(), Integer.toString(i))
                .clustering("c")
                .add("val", "val")
                .build()
                .applyUnsafe();
        }
        store.forceBlockingFlush();
        return store;
    }

    @After
    public void truncateCfs()
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.truncateBlocking();
    }

    private static Set<SSTableReader> getUnrepairedSSTables(ColumnFamilyStore cfs)
    {
        return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
    }

    /**
     * If the parent repair session is missing, we should still clean up
     */
    @Test
    public void missingParentRepairSession() throws Exception
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
        store.disableAutoCompaction();

        for (int table = 0; table < 10; table++)
        {
            generateSStable(store,Integer.toString(table));
        }
        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
        assertEquals(10, sstables.size());

        Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes()));
        List<Range<Token>> ranges = Arrays.asList(range);

        UUID missingRepairSession = UUIDGen.getTimeUUID();
        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
             Refs<SSTableReader> refs = Refs.ref(sstables))
        {
            Assert.assertFalse(refs.isEmpty());
            try
            {
                CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession, () -> false);
                Assert.fail("expected RuntimeException");
            }
            catch (RuntimeException e)
            {
                // expected
            }
            Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
            Assert.assertTrue(refs.isEmpty());
        }
    }

    @Test
    public void testSSTablesToInclude()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
        List<SSTableReader> sstables = new ArrayList<>();
        sstables.add(MockSchema.sstable(1, 10, 100, cfs));
        sstables.add(MockSchema.sstable(2, 100, 200, cfs));

        Range<Token> r = new Range<>(t(10), t(100)); // should include sstable 1 and 2 above, but none is fully contained (Range is (x, y])

        Iterator<SSTableReader> sstableIterator = sstables.iterator();
        Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
        assertTrue(fullyContainedSSTables.isEmpty());
        assertEquals(2, sstables.size());
    }

    @Test
    public void testSSTablesToInclude2()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
        List<SSTableReader> sstables = new ArrayList<>();
        SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs);
        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
        sstables.add(sstable1);
        sstables.add(sstable2);

        Range<Token> r = new Range<>(t(9), t(100)); // sstable 1 is fully contained

        Iterator<SSTableReader> sstableIterator = sstables.iterator();
        Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
        assertEquals(Collections.singleton(sstable1), fullyContainedSSTables);
        assertEquals(Collections.singletonList(sstable2), sstables);
    }

    @Test(expected = IllegalStateException.class)
    public void testSSTablesToNotInclude()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
        List<SSTableReader> sstables = new ArrayList<>();
        SSTableReader sstable1 = MockSchema.sstable(1, 0, 5, cfs);
        sstables.add(sstable1);

        Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting and should not be included

        CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
    }

    @Test(expected = IllegalStateException.class)
    public void testSSTablesToNotInclude2()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
        List<SSTableReader> sstables = new ArrayList<>();
        SSTableReader sstable1 = MockSchema.sstable(1, 10, 10, cfs);
        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
        sstables.add(sstable1);
        sstables.add(sstable2);

        Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, throw

        CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
    }

    @Test
    public void testSSTablesToInclude4()
    {
        ColumnFamilyStore cfs = MockSchema.newCFS("anticomp");
        List<SSTableReader> sstables = new ArrayList<>();
        SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs);
        SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs);
        sstables.add(sstable1);
        sstables.add(sstable2);

        Range<Token> r = new Range<>(t(9), t(200)); // sstable 2 is fully contained - last token is equal

        Iterator<SSTableReader> sstableIterator = sstables.iterator();
        Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
        assertEquals(Sets.newHashSet(sstable1, sstable2), fullyContainedSSTables);
        assertTrue(sstables.isEmpty());
    }

    private Token t(long t)
    {
        return new Murmur3Partitioner.LongToken(t);
    }
}
