blob: a4391179c169670d8b9315cbe778d7675f5485b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;
import java.util.HashSet;
import java.util.Set;
public class StoreChangeLogger<K, V> {
public interface ValueGetter<K, V> {
V get(K key);
}
// TODO: these values should be configurable
protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
protected final StateSerdes<K, V> serialization;
private final String topic;
private final int partition;
private final ProcessorContext context;
private final int maxDirty;
private final int maxRemoved;
protected Set<K> dirty;
protected Set<K> removed;
public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
}
public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
init();
}
protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
this.context = context;
this.partition = partition;
this.serialization = serialization;
this.maxDirty = maxDirty;
this.maxRemoved = maxRemoved;
}
public void init() {
this.dirty = new HashSet<>();
this.removed = new HashSet<>();
}
public void add(K key) {
this.dirty.add(key);
this.removed.remove(key);
}
public void delete(K key) {
this.dirty.remove(key);
this.removed.add(key);
}
public void maybeLogChange(ValueGetter<K, V> getter) {
if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
logChange(getter);
}
public void logChange(ValueGetter<K, V> getter) {
if (this.removed.isEmpty() && this.dirty.isEmpty())
return;
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
if (collector != null) {
Serializer<K> keySerializer = serialization.keySerializer();
Serializer<V> valueSerializer = serialization.valueSerializer();
for (K k : this.removed) {
collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
}
for (K k : this.dirty) {
V v = getter.get(k);
collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
}
this.removed.clear();
this.dirty.clear();
}
}
public void clear() {
this.removed.clear();
this.dirty.clear();
}
// this is for test only
public int numDirty() {
return this.dirty.size();
}
// this is for test only
public int numRemoved() {
return this.removed.size();
}
}