| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kafka.streams.state.internals; |
| |
| import org.apache.kafka.clients.producer.MockProducer; |
| import org.apache.kafka.clients.producer.Producer; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.common.serialization.Serde; |
| import org.apache.kafka.common.serialization.Serdes; |
| import org.apache.kafka.common.serialization.Serializer; |
| import org.apache.kafka.common.utils.Utils; |
| import org.apache.kafka.streams.KeyValue; |
| import org.apache.kafka.streams.processor.ProcessorContext; |
| import org.apache.kafka.streams.processor.StateStoreSupplier; |
| import org.apache.kafka.streams.processor.internals.RecordCollector; |
| import org.apache.kafka.streams.state.StateSerdes; |
| import org.apache.kafka.streams.state.WindowStore; |
| import org.apache.kafka.streams.state.WindowStoreIterator; |
| import org.apache.kafka.streams.state.WindowStoreUtils; |
| import org.apache.kafka.test.MockProcessorContext; |
| import org.junit.Test; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| |
| public class RocksDBWindowStoreTest { |
| |
| private final Serde<byte[]> byteArraySerde = Serdes.ByteArray(); |
| private final String windowName = "window"; |
| private final int numSegments = 3; |
| private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL; |
| private final long retentionPeriod = segmentSize * (numSegments - 1); |
| private final long windowSize = 3; |
| private final Serde<Integer> intSerde = Serdes.Integer(); |
| private final Serde<String> stringSerde = Serdes.String(); |
| private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", intSerde, stringSerde); |
| |
| @SuppressWarnings("unchecked") |
| protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context) { |
| StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, intSerde, stringSerde); |
| |
| WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); |
| store.init(context, store); |
| return store; |
| } |
| |
| @Test |
| public void testPutAndFetch() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| try { |
| long startTime = segmentSize - 4L; |
| |
| context.setTime(startTime + 0L); |
| store.put(0, "zero"); |
| context.setTime(startTime + 1L); |
| store.put(1, "one"); |
| context.setTime(startTime + 2L); |
| store.put(2, "two"); |
| context.setTime(startTime + 3L); |
| // (3, "three") is not put |
| context.setTime(startTime + 4L); |
| store.put(4, "four"); |
| context.setTime(startTime + 5L); |
| store.put(5, "five"); |
| |
| assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); |
| assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); |
| |
| context.setTime(startTime + 3L); |
| store.put(2, "two+1"); |
| context.setTime(startTime + 4L); |
| store.put(2, "two+2"); |
| context.setTime(startTime + 5L); |
| store.put(2, "two+3"); |
| context.setTime(startTime + 6L); |
| store.put(2, "two+4"); |
| context.setTime(startTime + 7L); |
| store.put(2, "two+5"); |
| context.setTime(startTime + 8L); |
| store.put(2, "two+6"); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); |
| assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); |
| assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); |
| assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); |
| assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); |
| assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); |
| assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); |
| |
| // Flush the store and verify all current entries were properly flushed ... |
| store.flush(); |
| |
| Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); |
| |
| assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); |
| assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); |
| assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); |
| assertNull(entriesByKey.get(3)); |
| assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); |
| assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); |
| assertNull(entriesByKey.get(6)); |
| |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testPutAndFetchBefore() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| try { |
| long startTime = segmentSize - 4L; |
| |
| context.setTime(startTime + 0L); |
| store.put(0, "zero"); |
| context.setTime(startTime + 1L); |
| store.put(1, "one"); |
| context.setTime(startTime + 2L); |
| store.put(2, "two"); |
| context.setTime(startTime + 3L); |
| // (3, "three") is not put |
| context.setTime(startTime + 4L); |
| store.put(4, "four"); |
| context.setTime(startTime + 5L); |
| store.put(5, "five"); |
| |
| assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L))); |
| assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L))); |
| |
| context.setTime(startTime + 3L); |
| store.put(2, "two+1"); |
| context.setTime(startTime + 4L); |
| store.put(2, "two+2"); |
| context.setTime(startTime + 5L); |
| store.put(2, "two+3"); |
| context.setTime(startTime + 6L); |
| store.put(2, "two+4"); |
| context.setTime(startTime + 7L); |
| store.put(2, "two+5"); |
| context.setTime(startTime + 8L); |
| store.put(2, "two+6"); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); |
| assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L))); |
| assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L))); |
| assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L))); |
| assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L))); |
| assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L))); |
| assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L))); |
| assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L))); |
| |
| // Flush the store and verify all current entries were properly flushed ... |
| store.flush(); |
| |
| Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); |
| |
| assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); |
| assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); |
| assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); |
| assertNull(entriesByKey.get(3)); |
| assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); |
| assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); |
| assertNull(entriesByKey.get(6)); |
| |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testPutAndFetchAfter() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| try { |
| long startTime = segmentSize - 4L; |
| |
| context.setTime(startTime + 0L); |
| store.put(0, "zero"); |
| context.setTime(startTime + 1L); |
| store.put(1, "one"); |
| context.setTime(startTime + 2L); |
| store.put(2, "two"); |
| context.setTime(startTime + 3L); |
| // (3, "three") is not put |
| context.setTime(startTime + 4L); |
| store.put(4, "four"); |
| context.setTime(startTime + 5L); |
| store.put(5, "five"); |
| |
| assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize))); |
| assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize))); |
| |
| context.setTime(startTime + 3L); |
| store.put(2, "two+1"); |
| context.setTime(startTime + 4L); |
| store.put(2, "two+2"); |
| context.setTime(startTime + 5L); |
| store.put(2, "two+3"); |
| context.setTime(startTime + 6L); |
| store.put(2, "two+4"); |
| context.setTime(startTime + 7L); |
| store.put(2, "two+5"); |
| context.setTime(startTime + 8L); |
| store.put(2, "two+6"); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize))); |
| assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); |
| assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize))); |
| assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize))); |
| assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize))); |
| assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize))); |
| assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize))); |
| assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize))); |
| |
| // Flush the store and verify all current entries were properly flushed ... |
| store.flush(); |
| |
| Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); |
| |
| assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); |
| assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); |
| assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); |
| assertNull(entriesByKey.get(3)); |
| assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); |
| assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); |
| assertNull(entriesByKey.get(6)); |
| |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testPutSameKeyTimestamp() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| try { |
| long startTime = segmentSize - 4L; |
| |
| context.setTime(startTime); |
| store.put(0, "zero"); |
| |
| assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| |
| context.setTime(startTime); |
| store.put(0, "zero"); |
| context.setTime(startTime); |
| store.put(0, "zero+"); |
| context.setTime(startTime); |
| store.put(0, "zero++"); |
| |
| assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); |
| assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); |
| assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); |
| |
| // Flush the store and verify all current entries were properly flushed ... |
| store.flush(); |
| |
| Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime); |
| |
| assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0)); |
| |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testRolling() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| RocksDBWindowStore<Integer, String> inner = |
| (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); |
| try { |
| long startTime = segmentSize * 2; |
| long incr = segmentSize / 2; |
| |
| context.setTime(startTime); |
| store.put(0, "zero"); |
| assertEquals(Utils.mkSet(2L), inner.segmentIds()); |
| |
| context.setTime(startTime + incr); |
| store.put(1, "one"); |
| assertEquals(Utils.mkSet(2L), inner.segmentIds()); |
| |
| context.setTime(startTime + incr * 2); |
| store.put(2, "two"); |
| assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds()); |
| |
| context.setTime(startTime + incr * 3); |
| // (3, "three") is not put |
| assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds()); |
| |
| context.setTime(startTime + incr * 4); |
| store.put(4, "four"); |
| assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); |
| |
| context.setTime(startTime + incr * 5); |
| store.put(5, "five"); |
| assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); |
| |
| assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); |
| |
| context.setTime(startTime + incr * 6); |
| store.put(6, "six"); |
| assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); |
| assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); |
| |
| |
| context.setTime(startTime + incr * 7); |
| store.put(7, "seven"); |
| assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); |
| assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); |
| assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); |
| assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); |
| |
| context.setTime(startTime + incr * 8); |
| store.put(8, "eight"); |
| assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); |
| assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); |
| assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); |
| assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); |
| |
| // check segment directories |
| store.flush(); |
| assertEquals( |
| Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), |
| segmentDirs(baseDir) |
| ); |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testRestore() throws IOException { |
| final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); |
| long startTime = segmentSize * 2; |
| long incr = segmentSize / 2; |
| |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| try { |
| context.setTime(startTime); |
| store.put(0, "zero"); |
| context.setTime(startTime + incr); |
| store.put(1, "one"); |
| context.setTime(startTime + incr * 2); |
| store.put(2, "two"); |
| context.setTime(startTime + incr * 3); |
| store.put(3, "three"); |
| context.setTime(startTime + incr * 4); |
| store.put(4, "four"); |
| context.setTime(startTime + incr * 5); |
| store.put(5, "five"); |
| context.setTime(startTime + incr * 6); |
| store.put(6, "six"); |
| context.setTime(startTime + incr * 7); |
| store.put(7, "seven"); |
| context.setTime(startTime + incr * 8); |
| store.put(8, "eight"); |
| store.flush(); |
| |
| } finally { |
| store.close(); |
| } |
| |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| |
| File baseDir2 = Files.createTempDirectory("test").toFile(); |
| try { |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| changeLog.add(new KeyValue<>( |
| keySerializer.serialize(record.topic(), record.key()), |
| valueSerializer.serialize(record.topic(), record.value())) |
| ); |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| RocksDBWindowStore<Integer, String> inner = |
| (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); |
| |
| try { |
| context.restore(windowName, changeLog); |
| |
| assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); |
| |
| assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); |
| assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); |
| assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); |
| assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); |
| assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); |
| assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); |
| assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); |
| |
| // check segment directories |
| store.flush(); |
| assertEquals( |
| Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), |
| segmentDirs(baseDir) |
| ); |
| } finally { |
| store.close(); |
| } |
| |
| |
| } finally { |
| Utils.delete(baseDir2); |
| } |
| } |
| |
| @Test |
| public void testSegmentMaintenance() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| // do nothing |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| RocksDBWindowStore<Integer, String> inner = |
| (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); |
| |
| try { |
| |
| context.setTime(0L); |
| store.put(0, "v"); |
| assertEquals( |
| Utils.mkSet(inner.segmentName(0L)), |
| segmentDirs(baseDir) |
| ); |
| |
| context.setTime(59999L); |
| store.put(0, "v"); |
| context.setTime(59999L); |
| store.put(0, "v"); |
| assertEquals( |
| Utils.mkSet(inner.segmentName(0L)), |
| segmentDirs(baseDir) |
| ); |
| |
| context.setTime(60000L); |
| store.put(0, "v"); |
| assertEquals( |
| Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)), |
| segmentDirs(baseDir) |
| ); |
| |
| WindowStoreIterator iter; |
| int fetchedCount; |
| |
| iter = store.fetch(0, 0L, 240000L); |
| fetchedCount = 0; |
| while (iter.hasNext()) { |
| iter.next(); |
| fetchedCount++; |
| } |
| assertEquals(4, fetchedCount); |
| |
| assertEquals( |
| Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)), |
| segmentDirs(baseDir) |
| ); |
| |
| context.setTime(180000L); |
| store.put(0, "v"); |
| |
| iter = store.fetch(0, 0L, 240000L); |
| fetchedCount = 0; |
| while (iter.hasNext()) { |
| iter.next(); |
| fetchedCount++; |
| } |
| assertEquals(2, fetchedCount); |
| |
| assertEquals( |
| Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)), |
| segmentDirs(baseDir) |
| ); |
| |
| context.setTime(300000L); |
| store.put(0, "v"); |
| |
| iter = store.fetch(0, 240000L, 1000000L); |
| fetchedCount = 0; |
| while (iter.hasNext()) { |
| iter.next(); |
| fetchedCount++; |
| } |
| assertEquals(1, fetchedCount); |
| |
| assertEquals( |
| Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)), |
| segmentDirs(baseDir) |
| ); |
| |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testInitialLoading() throws IOException { |
| File baseDir = Files.createTempDirectory("test").toFile(); |
| try { |
| Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); |
| RecordCollector recordCollector = new RecordCollector(producer) { |
| @Override |
| public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { |
| // do nothing |
| } |
| }; |
| |
| MockProcessorContext context = new MockProcessorContext( |
| null, baseDir, |
| byteArraySerde, byteArraySerde, |
| recordCollector); |
| |
| File storeDir = new File(baseDir, windowName); |
| |
| WindowStore<Integer, String> store = createWindowStore(context); |
| RocksDBWindowStore<Integer, String> inner = |
| (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); |
| |
| try { |
| new File(storeDir, inner.segmentName(0L)).mkdir(); |
| new File(storeDir, inner.segmentName(1L)).mkdir(); |
| new File(storeDir, inner.segmentName(2L)).mkdir(); |
| new File(storeDir, inner.segmentName(3L)).mkdir(); |
| new File(storeDir, inner.segmentName(4L)).mkdir(); |
| new File(storeDir, inner.segmentName(5L)).mkdir(); |
| new File(storeDir, inner.segmentName(6L)).mkdir(); |
| } finally { |
| store.close(); |
| } |
| |
| store = createWindowStore(context); |
| inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); |
| |
| try { |
| assertEquals( |
| Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), |
| segmentDirs(baseDir) |
| ); |
| |
| WindowStoreIterator iter = store.fetch(0, 0L, 1000000L); |
| while (iter.hasNext()) { |
| iter.next(); |
| } |
| |
| assertEquals( |
| Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), |
| segmentDirs(baseDir) |
| ); |
| |
| } finally { |
| store.close(); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| private <E> List<E> toList(WindowStoreIterator<E> iterator) { |
| ArrayList<E> list = new ArrayList<>(); |
| while (iterator.hasNext()) { |
| list.add(iterator.next().value); |
| } |
| return list; |
| } |
| |
| private Set<String> segmentDirs(File baseDir) { |
| File windowDir = new File(baseDir, windowName); |
| |
| return new HashSet<>(Arrays.asList(windowDir.list())); |
| } |
| |
| private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) { |
| HashMap<Integer, Set<String>> entriesByKey = new HashMap<>(); |
| |
| for (KeyValue<byte[], byte[]> entry : changeLog) { |
| long timestamp = WindowStoreUtils.timestampFromBinaryKey(entry.key); |
| Integer key = WindowStoreUtils.keyFromBinaryKey(entry.key, serdes); |
| String value = entry.value == null ? null : serdes.valueFrom(entry.value); |
| |
| Set<String> entries = entriesByKey.get(key); |
| if (entries == null) { |
| entries = new HashSet<>(); |
| entriesByKey.put(key, entries); |
| } |
| entries.add(value + "@" + (timestamp - startTime)); |
| } |
| |
| return entriesByKey; |
| } |
| } |