blob: 502870b2a54483c308eff75a51b90e20e99bece6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.WindowStoreUtils;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class RocksDBWindowStoreTest {
private final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
private final String windowName = "window";
private final int numSegments = 3;
private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
private final long retentionPeriod = segmentSize * (numSegments - 1);
private final long windowSize = 3;
private final Serde<Integer> intSerde = Serdes.Integer();
private final Serde<String> stringSerde = Serdes.String();
private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", intSerde, stringSerde);
@SuppressWarnings("unchecked")
protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context) {
StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, intSerde, stringSerde);
WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
store.init(context, store);
return store;
}
@Test
public void testPutAndFetch() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
try {
long startTime = segmentSize - 4L;
context.setTime(startTime + 0L);
store.put(0, "zero");
context.setTime(startTime + 1L);
store.put(1, "one");
context.setTime(startTime + 2L);
store.put(2, "two");
context.setTime(startTime + 3L);
// (3, "three") is not put
context.setTime(startTime + 4L);
store.put(4, "four");
context.setTime(startTime + 5L);
store.put(5, "five");
assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
context.setTime(startTime + 3L);
store.put(2, "two+1");
context.setTime(startTime + 4L);
store.put(2, "two+2");
context.setTime(startTime + 5L);
store.put(2, "two+3");
context.setTime(startTime + 6L);
store.put(2, "two+4");
context.setTime(startTime + 7L);
store.put(2, "two+5");
context.setTime(startTime + 8L);
store.put(2, "two+6");
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
// Flush the store and verify all current entries were properly flushed ...
store.flush();
Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
assertNull(entriesByKey.get(3));
assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testPutAndFetchBefore() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
try {
long startTime = segmentSize - 4L;
context.setTime(startTime + 0L);
store.put(0, "zero");
context.setTime(startTime + 1L);
store.put(1, "one");
context.setTime(startTime + 2L);
store.put(2, "two");
context.setTime(startTime + 3L);
// (3, "three") is not put
context.setTime(startTime + 4L);
store.put(4, "four");
context.setTime(startTime + 5L);
store.put(5, "five");
assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
context.setTime(startTime + 3L);
store.put(2, "two+1");
context.setTime(startTime + 4L);
store.put(2, "two+2");
context.setTime(startTime + 5L);
store.put(2, "two+3");
context.setTime(startTime + 6L);
store.put(2, "two+4");
context.setTime(startTime + 7L);
store.put(2, "two+5");
context.setTime(startTime + 8L);
store.put(2, "two+6");
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
// Flush the store and verify all current entries were properly flushed ...
store.flush();
Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
assertNull(entriesByKey.get(3));
assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testPutAndFetchAfter() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
try {
long startTime = segmentSize - 4L;
context.setTime(startTime + 0L);
store.put(0, "zero");
context.setTime(startTime + 1L);
store.put(1, "one");
context.setTime(startTime + 2L);
store.put(2, "two");
context.setTime(startTime + 3L);
// (3, "three") is not put
context.setTime(startTime + 4L);
store.put(4, "four");
context.setTime(startTime + 5L);
store.put(5, "five");
assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
context.setTime(startTime + 3L);
store.put(2, "two+1");
context.setTime(startTime + 4L);
store.put(2, "two+2");
context.setTime(startTime + 5L);
store.put(2, "two+3");
context.setTime(startTime + 6L);
store.put(2, "two+4");
context.setTime(startTime + 7L);
store.put(2, "two+5");
context.setTime(startTime + 8L);
store.put(2, "two+6");
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
// Flush the store and verify all current entries were properly flushed ...
store.flush();
Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
assertNull(entriesByKey.get(3));
assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
assertNull(entriesByKey.get(6));
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testPutSameKeyTimestamp() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
try {
long startTime = segmentSize - 4L;
context.setTime(startTime);
store.put(0, "zero");
assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
context.setTime(startTime);
store.put(0, "zero");
context.setTime(startTime);
store.put(0, "zero+");
context.setTime(startTime);
store.put(0, "zero++");
assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
// Flush the store and verify all current entries were properly flushed ...
store.flush();
Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testRolling() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
RocksDBWindowStore<Integer, String> inner =
(RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
try {
long startTime = segmentSize * 2;
long incr = segmentSize / 2;
context.setTime(startTime);
store.put(0, "zero");
assertEquals(Utils.mkSet(2L), inner.segmentIds());
context.setTime(startTime + incr);
store.put(1, "one");
assertEquals(Utils.mkSet(2L), inner.segmentIds());
context.setTime(startTime + incr * 2);
store.put(2, "two");
assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
context.setTime(startTime + incr * 3);
// (3, "three") is not put
assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
context.setTime(startTime + incr * 4);
store.put(4, "four");
assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
context.setTime(startTime + incr * 5);
store.put(5, "five");
assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
context.setTime(startTime + incr * 6);
store.put(6, "six");
assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
context.setTime(startTime + incr * 7);
store.put(7, "seven");
assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
context.setTime(startTime + incr * 8);
store.put(8, "eight");
assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
// check segment directories
store.flush();
assertEquals(
Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
segmentDirs(baseDir)
);
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testRestore() throws IOException {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
long startTime = segmentSize * 2;
long incr = segmentSize / 2;
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
try {
context.setTime(startTime);
store.put(0, "zero");
context.setTime(startTime + incr);
store.put(1, "one");
context.setTime(startTime + incr * 2);
store.put(2, "two");
context.setTime(startTime + incr * 3);
store.put(3, "three");
context.setTime(startTime + incr * 4);
store.put(4, "four");
context.setTime(startTime + incr * 5);
store.put(5, "five");
context.setTime(startTime + incr * 6);
store.put(6, "six");
context.setTime(startTime + incr * 7);
store.put(7, "seven");
context.setTime(startTime + incr * 8);
store.put(8, "eight");
store.flush();
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
File baseDir2 = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
RocksDBWindowStore<Integer, String> inner =
(RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
try {
context.restore(windowName, changeLog);
assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
// check segment directories
store.flush();
assertEquals(
Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
segmentDirs(baseDir)
);
} finally {
store.close();
}
} finally {
Utils.delete(baseDir2);
}
}
@Test
public void testSegmentMaintenance() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// do nothing
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
WindowStore<Integer, String> store = createWindowStore(context);
RocksDBWindowStore<Integer, String> inner =
(RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
try {
context.setTime(0L);
store.put(0, "v");
assertEquals(
Utils.mkSet(inner.segmentName(0L)),
segmentDirs(baseDir)
);
context.setTime(59999L);
store.put(0, "v");
context.setTime(59999L);
store.put(0, "v");
assertEquals(
Utils.mkSet(inner.segmentName(0L)),
segmentDirs(baseDir)
);
context.setTime(60000L);
store.put(0, "v");
assertEquals(
Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
segmentDirs(baseDir)
);
WindowStoreIterator iter;
int fetchedCount;
iter = store.fetch(0, 0L, 240000L);
fetchedCount = 0;
while (iter.hasNext()) {
iter.next();
fetchedCount++;
}
assertEquals(4, fetchedCount);
assertEquals(
Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
segmentDirs(baseDir)
);
context.setTime(180000L);
store.put(0, "v");
iter = store.fetch(0, 0L, 240000L);
fetchedCount = 0;
while (iter.hasNext()) {
iter.next();
fetchedCount++;
}
assertEquals(2, fetchedCount);
assertEquals(
Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)),
segmentDirs(baseDir)
);
context.setTime(300000L);
store.put(0, "v");
iter = store.fetch(0, 240000L, 1000000L);
fetchedCount = 0;
while (iter.hasNext()) {
iter.next();
fetchedCount++;
}
assertEquals(1, fetchedCount);
assertEquals(
Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)),
segmentDirs(baseDir)
);
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testInitialLoading() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// do nothing
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
File storeDir = new File(baseDir, windowName);
WindowStore<Integer, String> store = createWindowStore(context);
RocksDBWindowStore<Integer, String> inner =
(RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
try {
new File(storeDir, inner.segmentName(0L)).mkdir();
new File(storeDir, inner.segmentName(1L)).mkdir();
new File(storeDir, inner.segmentName(2L)).mkdir();
new File(storeDir, inner.segmentName(3L)).mkdir();
new File(storeDir, inner.segmentName(4L)).mkdir();
new File(storeDir, inner.segmentName(5L)).mkdir();
new File(storeDir, inner.segmentName(6L)).mkdir();
} finally {
store.close();
}
store = createWindowStore(context);
inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
try {
assertEquals(
Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
segmentDirs(baseDir)
);
WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
while (iter.hasNext()) {
iter.next();
}
assertEquals(
Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
segmentDirs(baseDir)
);
} finally {
store.close();
}
} finally {
Utils.delete(baseDir);
}
}
private <E> List<E> toList(WindowStoreIterator<E> iterator) {
ArrayList<E> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(iterator.next().value);
}
return list;
}
private Set<String> segmentDirs(File baseDir) {
File windowDir = new File(baseDir, windowName);
return new HashSet<>(Arrays.asList(windowDir.list()));
}
private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
for (KeyValue<byte[], byte[]> entry : changeLog) {
long timestamp = WindowStoreUtils.timestampFromBinaryKey(entry.key);
Integer key = WindowStoreUtils.keyFromBinaryKey(entry.key, serdes);
String value = entry.value == null ? null : serdes.valueFrom(entry.value);
Set<String> entries = entriesByKey.get(key);
if (entries == null) {
entries = new HashSet<>();
entriesByKey.put(key, entries);
}
entries.add(value + "@" + (timestamp - startTime));
}
return entriesByKey;
}
}