| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kafka.streams.processor.internals; |
| |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.MockConsumer; |
| import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.Node; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.record.TimestampType; |
| import org.apache.kafka.common.serialization.IntegerSerializer; |
| import org.apache.kafka.common.serialization.Serializer; |
| import org.apache.kafka.common.utils.Utils; |
| import org.apache.kafka.streams.state.internals.OffsetCheckpoint; |
| import org.apache.kafka.test.MockStateStoreSupplier; |
| import org.junit.Test; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.channels.FileLock; |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertFalse; |
| |
| public class ProcessorStateManagerTest { |
| |
| public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> { |
| private final Serializer<Integer> serializer = new IntegerSerializer(); |
| |
| public TopicPartition assignedPartition = null; |
| public TopicPartition seekPartition = null; |
| public long seekOffset = -1L; |
| public boolean seekToBeginingCalled = false; |
| public boolean seekToEndCalled = false; |
| private long endOffset = 0L; |
| private long currentOffset = 0L; |
| |
| private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>(); |
| |
| MockRestoreConsumer() { |
| super(OffsetResetStrategy.EARLIEST); |
| |
| reset(); |
| } |
| |
| // reset this mock restore consumer for a state store registration |
| public void reset() { |
| assignedPartition = null; |
| seekOffset = -1L; |
| seekToBeginingCalled = false; |
| seekToEndCalled = false; |
| endOffset = 0L; |
| recordBuffer.clear(); |
| } |
| |
| // buffer a record (we cannot use addRecord because we need to add records before assigning a partition) |
| public void bufferRecord(ConsumerRecord<Integer, Integer> record) { |
| recordBuffer.add( |
| new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), 0L, |
| TimestampType.CREATE_TIME, 0L, 0, 0, |
| serializer.serialize(record.topic(), record.key()), |
| serializer.serialize(record.topic(), record.value()))); |
| endOffset = record.offset(); |
| |
| super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset)); |
| } |
| |
| @Override |
| public synchronized void assign(Collection<TopicPartition> partitions) { |
| int numPartitions = partitions.size(); |
| if (numPartitions > 1) |
| throw new IllegalArgumentException("RestoreConsumer: more than one partition specified"); |
| |
| if (numPartitions == 1) { |
| if (assignedPartition != null) |
| throw new IllegalStateException("RestoreConsumer: partition already assigned"); |
| assignedPartition = partitions.iterator().next(); |
| |
| // set the beginning offset to 0 |
| // NOTE: this is users responsible to set the initial lEO. |
| super.updateBeginningOffsets(Collections.singletonMap(assignedPartition, 0L)); |
| } |
| |
| super.assign(partitions); |
| } |
| |
| @Override |
| public ConsumerRecords<byte[], byte[]> poll(long timeout) { |
| // add buffered records to MockConsumer |
| for (ConsumerRecord<byte[], byte[]> record : recordBuffer) { |
| super.addRecord(record); |
| } |
| recordBuffer.clear(); |
| |
| ConsumerRecords<byte[], byte[]> records = super.poll(timeout); |
| |
| // set the current offset |
| Iterable<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(assignedPartition); |
| for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { |
| currentOffset = record.offset(); |
| } |
| |
| return records; |
| } |
| |
| @Override |
| public synchronized long position(TopicPartition partition) { |
| if (!partition.equals(assignedPartition)) |
| throw new IllegalStateException("RestoreConsumer: unassigned partition"); |
| |
| return currentOffset; |
| } |
| |
| @Override |
| public synchronized void seek(TopicPartition partition, long offset) { |
| if (offset < 0) |
| throw new IllegalArgumentException("RestoreConsumer: offset should not be negative"); |
| |
| if (seekOffset >= 0) |
| throw new IllegalStateException("RestoreConsumer: offset already seeked"); |
| |
| seekPartition = partition; |
| seekOffset = offset; |
| currentOffset = offset; |
| super.seek(partition, offset); |
| } |
| |
| @Override |
| public synchronized void seekToBeginning(Collection<TopicPartition> partitions) { |
| if (partitions.size() != 1) |
| throw new IllegalStateException("RestoreConsumer: other than one partition specified"); |
| |
| for (TopicPartition partition : partitions) { |
| if (!partition.equals(assignedPartition)) |
| throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition"); |
| } |
| |
| seekToBeginingCalled = true; |
| currentOffset = 0L; |
| } |
| |
| @Override |
| public synchronized void seekToEnd(Collection<TopicPartition> partitions) { |
| if (partitions.size() != 1) |
| throw new IllegalStateException("RestoreConsumer: other than one partition specified"); |
| |
| for (TopicPartition partition : partitions) { |
| if (!partition.equals(assignedPartition)) |
| throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition"); |
| } |
| |
| seekToEndCalled = true; |
| currentOffset = endOffset; |
| } |
| } |
| |
| private final Set<TopicPartition> noPartitions = Collections.emptySet(); |
| private final String applicationId = "test-application"; |
| private final String stateDir = "test"; |
| private final String persistentStoreName = "persistentStore"; |
| private final String nonPersistentStoreName = "nonPersistentStore"; |
| private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); |
| private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName); |
| |
| @Test |
| public void testLockStateDirectory() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| try { |
| FileLock lock; |
| |
| // the state manager locks the directory |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false); |
| |
| try { |
| // this should not get the lock |
| lock = ProcessorStateManager.lockStateDirectory(baseDir); |
| assertNull(lock); |
| } finally { |
| // by closing the state manager, release the lock |
| stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); |
| } |
| |
| // now, this should get the lock |
| lock = ProcessorStateManager.lockStateDirectory(baseDir); |
| try { |
| assertNotNull(lock); |
| } finally { |
| if (lock != null) lock.release(); |
| } |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test(expected = KafkaException.class) |
| public void testNoTopic() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| try { |
| MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); |
| |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false); |
| try { |
| stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); |
| } finally { |
| stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); |
| } |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testRegisterPersistentStore() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| try { |
| long lastCheckpointedOffset = 10L; |
| |
| OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); |
| checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); |
| |
| MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); |
| |
| restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( |
| new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo(persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| |
| TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); |
| restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); |
| |
| MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store |
| |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false); |
| try { |
| restoreConsumer.reset(); |
| |
| ArrayList<Integer> expectedKeys = new ArrayList<>(); |
| long offset; |
| for (int i = 1; i <= 3; i++) { |
| offset = (long) i; |
| int key = i * 10; |
| expectedKeys.add(key); |
| restoreConsumer.bufferRecord( |
| new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0) |
| ); |
| } |
| |
| stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); |
| |
| assertEquals(new TopicPartition(persistentStoreTopicName, 2), restoreConsumer.assignedPartition); |
| assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset); |
| assertFalse(restoreConsumer.seekToBeginingCalled); |
| assertTrue(restoreConsumer.seekToEndCalled); |
| assertEquals(expectedKeys, persistentStore.keys); |
| |
| } finally { |
| stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testRegisterNonPersistentStore() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| try { |
| long lastCheckpointedOffset = 10L; |
| |
| MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); |
| |
| OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); |
| checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); |
| |
| restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( |
| new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo(nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| |
| TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); |
| restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); |
| |
| MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store |
| |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false); |
| try { |
| restoreConsumer.reset(); |
| |
| ArrayList<Integer> expectedKeys = new ArrayList<>(); |
| long offset = -1L; |
| for (int i = 1; i <= 3; i++) { |
| offset = (long) (i + 100); |
| int key = i; |
| expectedKeys.add(i); |
| restoreConsumer.bufferRecord( |
| new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0) |
| ); |
| } |
| |
| stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); |
| |
| assertEquals(new TopicPartition(nonPersistentStoreTopicName, 2), restoreConsumer.assignedPartition); |
| assertEquals(0L, restoreConsumer.seekOffset); |
| assertTrue(restoreConsumer.seekToBeginingCalled); |
| assertTrue(restoreConsumer.seekToEndCalled); |
| assertEquals(expectedKeys, nonPersistentStore.keys); |
| |
| } finally { |
| stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); |
| } |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testChangeLogOffsets() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| try { |
| long lastCheckpointedOffset = 10L; |
| String storeName1 = "store1"; |
| String storeName2 = "store2"; |
| String storeName3 = "store3"; |
| |
| String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1); |
| String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); |
| String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3); |
| |
| OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); |
| checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); |
| |
| MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); |
| |
| restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList( |
| new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList( |
| new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| restoreConsumer.updatePartitions(storeTopicName3, Utils.mkList( |
| new PartitionInfo(storeTopicName3, 0, Node.noNode(), new Node[0], new Node[0]), |
| new PartitionInfo(storeTopicName3, 1, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| |
| TopicPartition partition1 = new TopicPartition(storeTopicName1, 0); |
| TopicPartition partition2 = new TopicPartition(storeTopicName2, 0); |
| TopicPartition partition3 = new TopicPartition(storeTopicName3, 1); |
| |
| Map<TopicPartition, Long> endOffsets = new HashMap<>(); |
| endOffsets.put(partition1, 13L); |
| endOffsets.put(partition2, 17L); |
| restoreConsumer.updateEndOffsets(endOffsets); |
| |
| MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true); |
| MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true); |
| MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true); |
| |
| // if there is an source partition, inherit the partition id |
| Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby |
| try { |
| restoreConsumer.reset(); |
| |
| stateMgr.register(store1, true, store1.stateRestoreCallback); |
| stateMgr.register(store2, true, store2.stateRestoreCallback); |
| stateMgr.register(store3, true, store3.stateRestoreCallback); |
| |
| Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets(); |
| |
| assertEquals(3, changeLogOffsets.size()); |
| assertTrue(changeLogOffsets.containsKey(partition1)); |
| assertTrue(changeLogOffsets.containsKey(partition2)); |
| assertTrue(changeLogOffsets.containsKey(partition3)); |
| assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1)); |
| assertEquals(-1L, (long) changeLogOffsets.get(partition2)); |
| assertEquals(-1L, (long) changeLogOffsets.get(partition3)); |
| |
| } finally { |
| stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); |
| } |
| |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testGetStore() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| try { |
| MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); |
| |
| restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( |
| new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| |
| MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); |
| |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false); |
| try { |
| stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); |
| |
| assertNull(stateMgr.getStore("noSuchStore")); |
| assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName)); |
| |
| } finally { |
| stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); |
| } |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| @Test |
| public void testClose() throws IOException { |
| File baseDir = Files.createTempDirectory(stateDir).toFile(); |
| File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME); |
| try { |
| // write an empty checkpoint file |
| OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); |
| oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap()); |
| |
| MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); |
| |
| restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( |
| new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( |
| new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) |
| )); |
| |
| // set up ack'ed offsets |
| HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>(); |
| ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L); |
| ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); |
| ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); |
| |
| MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); |
| MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); |
| |
| ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false); |
| try { |
| // make sure the checkpoint file is deleted |
| assertFalse(checkpointFile.exists()); |
| |
| restoreConsumer.reset(); |
| stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); |
| |
| restoreConsumer.reset(); |
| stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); |
| } finally { |
| // close the state manager with the ack'ed offsets |
| stateMgr.close(ackedOffsets); |
| } |
| |
| // make sure all stores are closed, and the checkpoint file is written. |
| assertTrue(persistentStore.flushed); |
| assertTrue(persistentStore.closed); |
| assertTrue(nonPersistentStore.flushed); |
| assertTrue(nonPersistentStore.closed); |
| assertTrue(checkpointFile.exists()); |
| |
| // the checkpoint file should contain an offset from the persistent store only. |
| OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile); |
| Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read(); |
| assertEquals(1, checkpointedOffsets.size()); |
| assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); |
| } finally { |
| Utils.delete(baseDir); |
| } |
| } |
| |
| } |