blob: ec2d0adc974267d043402a5ad94489b3b8bcdf7b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.windowing.persistence;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.windowing.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A wrapper around the window related states that are checkpointed.
*/
public class WindowState<T> extends AbstractCollection<Event<T>> {
// number of events per window-partition
public static final int MAX_PARTITION_EVENTS = 1000;
public static final int MIN_PARTITIONS = 10;
private static final Logger LOG = LoggerFactory.getLogger(WindowState.class);
private static final String PARTITION_IDS_KEY = "pk";
private final KeyValueState<String, Deque<Long>> partitionIdsState;
private final KeyValueState<Long, WindowPartition<T>> windowPartitionsState;
private final KeyValueState<String, Optional<?>> windowSystemState;
private final ReentrantLock partitionIdsLock = new ReentrantLock(true);
private final WindowPartitionLock windowPartitionsLock = new WindowPartitionLock();
private final long maxEventsInMemory;
// ordered partition keys
private volatile Deque<Long> partitionIds;
private volatile long latestPartitionId;
private volatile WindowPartition<T> latestPartition;
private volatile WindowPartitionCache<Long, WindowPartition<T>> cache;
private Supplier<Map<String, Optional<?>>> windowSystemStateSupplier;
private Set<Long> iteratorPins = new HashSet<>();
public WindowState(KeyValueState<Long, WindowPartition<T>> windowPartitionsState,
KeyValueState<String, Deque<Long>> partitionIdsState,
KeyValueState<String, Optional<?>> windowSystemState,
Supplier<Map<String, Optional<?>>> windowSystemStateSupplier,
long maxEventsInMemory) {
this.windowPartitionsState = windowPartitionsState;
this.partitionIdsState = partitionIdsState;
this.windowSystemState = windowSystemState;
this.windowSystemStateSupplier = windowSystemStateSupplier;
this.maxEventsInMemory = Math.max(MAX_PARTITION_EVENTS * MIN_PARTITIONS, maxEventsInMemory);
init();
}
@Override
public boolean add(Event<T> event) {
if (latestPartition.size() >= MAX_PARTITION_EVENTS) {
cache.unpin(latestPartition.getId());
latestPartition = getPinnedPartition(getNextPartitionId());
}
latestPartition.add(event);
return true;
}
@Override
public Iterator<Event<T>> iterator() {
return new Iterator<Event<T>>() {
private Iterator<Long> ids = getIds();
private Iterator<Event<T>> current = Collections.emptyIterator();
private Iterator<Event<T>> removeFrom;
private WindowPartition<T> curPartition;
private Iterator<Long> getIds() {
try {
partitionIdsLock.lock();
LOG.debug("Iterator partitionIds: {}", partitionIds);
return new ArrayList<>(partitionIds).iterator();
} finally {
partitionIdsLock.unlock();
}
}
@Override
public void remove() {
if (removeFrom == null) {
throw new IllegalStateException("No calls to next() since last call to remove()");
}
removeFrom.remove();
removeFrom = null;
}
@Override
public boolean hasNext() {
boolean curHasNext = current.hasNext();
while (!curHasNext && ids.hasNext()) {
if (curPartition != null) {
unpin(curPartition.getId());
}
curPartition = getPinnedPartition(ids.next());
if (curPartition != null) {
iteratorPins.add(curPartition.getId());
current = curPartition.iterator();
curHasNext = current.hasNext();
}
}
// un-pin the last partition
if (!curHasNext && curPartition != null) {
unpin(curPartition.getId());
curPartition = null;
}
return curHasNext;
}
@Override
public Event<T> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
removeFrom = current;
return current.next();
}
private void unpin(long id) {
cache.unpin(id);
iteratorPins.remove(id);
}
};
}
public void clearIteratorPins() {
LOG.debug("clearIteratorPins '{}'", iteratorPins);
Iterator<Long> it = iteratorPins.iterator();
while (it.hasNext()) {
cache.unpin(it.next());
it.remove();
}
}
@Override
public int size() {
throw new UnsupportedOperationException();
}
/**
* Prepares the {@link WindowState} for commit.
*
* @param txid the transaction id
*/
public void prepareCommit(long txid) {
flush();
partitionIdsState.prepareCommit(txid);
windowPartitionsState.prepareCommit(txid);
windowSystemState.prepareCommit(txid);
}
/**
* Commits the {@link WindowState}.
*
* @param txid the transaction id
*/
public void commit(long txid) {
partitionIdsState.commit(txid);
windowPartitionsState.commit(txid);
windowSystemState.commit(txid);
}
/**
* Rolls back the {@link WindowState}.
*
* @param reInit if the members should be synced with the values from the state.
*/
public void rollback(boolean reInit) {
partitionIdsState.rollback();
windowPartitionsState.rollback();
windowSystemState.rollback();
// re-init cache and partitions
if (reInit) {
init();
}
}
private void init() {
initCache();
initPartitions();
}
private void initPartitions() {
partitionIds = partitionIdsState.get(PARTITION_IDS_KEY, new LinkedList<>());
if (partitionIds.isEmpty()) {
partitionIds.add(0L);
partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
}
latestPartitionId = partitionIds.peekLast();
latestPartition = cache.pinAndGet(latestPartitionId);
}
private void initCache() {
long size = maxEventsInMemory / MAX_PARTITION_EVENTS;
LOG.info("maxEventsInMemory: {}, partition size: {}, number of partitions: {}",
maxEventsInMemory, MAX_PARTITION_EVENTS, size);
cache = SimpleWindowPartitionCache.<Long, WindowPartition<T>>newBuilder()
.maximumSize(size)
.removalListener(new WindowPartitionCache.RemovalListener<Long, WindowPartition<T>>() {
@Override
public void onRemoval(Long pid, WindowPartition<T> p, WindowPartitionCache.RemovalCause removalCause) {
Objects.requireNonNull(pid, "Null partition id");
Objects.requireNonNull(p, "Null window partition");
LOG.debug("onRemoval for id '{}', WindowPartition '{}'", pid, p);
try {
windowPartitionsLock.lock(pid);
if (p.isEmpty() && pid != latestPartitionId) {
// if the empty partition was not invalidated by flush, but evicted from cache
if (removalCause != WindowPartitionCache.RemovalCause.EXPLICIT) {
deletePartition(pid);
windowPartitionsState.delete(pid);
}
} else if (p.isModified()) {
windowPartitionsState.put(pid, p);
} else {
LOG.debug("WindowPartition '{}' is not modified", pid);
}
} finally {
windowPartitionsLock.unlock(pid);
}
}
}).build(new WindowPartitionCache.CacheLoader<Long, WindowPartition<T>>() {
@Override
public WindowPartition<T> load(Long id) {
LOG.debug("Load partition: {}", id);
// load from state
try {
windowPartitionsLock.lock(id);
return windowPartitionsState.get(id, new WindowPartition<>(id));
} finally {
windowPartitionsLock.unlock(id);
}
}
});
}
private void deletePartition(long pid) {
LOG.debug("Delete partition: {}", pid);
try {
partitionIdsLock.lock();
partitionIds.remove(pid);
partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
} finally {
partitionIdsLock.unlock();
}
}
private long getNextPartitionId() {
try {
partitionIdsLock.lock();
partitionIds.add(++latestPartitionId);
partitionIdsState.put(PARTITION_IDS_KEY, partitionIds);
} finally {
partitionIdsLock.unlock();
}
return latestPartitionId;
}
private WindowPartition<T> getPinnedPartition(long id) {
return cache.pinAndGet(id);
}
private void flush() {
LOG.debug("Flushing modified partitions");
cache.asMap().forEach((pid, p) -> {
Long pidToInvalidate = null;
try {
windowPartitionsLock.lock(pid);
if (p.isEmpty() && pid != latestPartitionId) {
LOG.debug("Invalidating empty partition {}", pid);
deletePartition(pid);
windowPartitionsState.delete(pid);
pidToInvalidate = pid;
} else if (p.isModified()) {
LOG.debug("Updating modified partition {}", pid);
p.clearModified();
windowPartitionsState.put(pid, p);
}
} finally {
windowPartitionsLock.unlock(pid);
}
// invalidate after releasing the lock
// if the parition is pinned before we could invalidate,
// it will get invalidated in the next flush or when the entry gets evicted from the cache.
if (pidToInvalidate != null) {
cache.invalidate(pidToInvalidate);
}
});
Map<String, Optional<?>> state = windowSystemStateSupplier.get();
for (Map.Entry<String, Optional<?>> entry : state.entrySet()) {
windowSystemState.put(entry.getKey(), entry.getValue());
}
}
private static class WindowPartitionLock {
private final int numLocks = 8;
private final ImmutableMap<Long, ReentrantLock> locks;
WindowPartitionLock() {
ImmutableMap.Builder<Long, ReentrantLock> builder = ImmutableMap.builder();
for (long i = 0; i < numLocks; i++) {
builder.put(i, new ReentrantLock(true));
}
locks = builder.build();
}
private void lock(long i) {
locks.get(i % numLocks).lock();
}
private void unlock(long i) {
locks.get(i % numLocks).unlock();
}
}
// the window partition that holds the events
public static class WindowPartition<T> implements Iterable<Event<T>> {
private final ConcurrentLinkedQueue<Event<T>> events = new ConcurrentLinkedQueue<>();
private final AtomicInteger size = new AtomicInteger();
private final long id;
private transient volatile boolean modified;
public WindowPartition(long id) {
this.id = id;
}
void add(Event<T> event) {
events.add(event);
size.incrementAndGet();
setModified();
}
boolean isModified() {
return modified;
}
void setModified() {
if (!modified) {
modified = true;
}
}
void clearModified() {
modified = false;
}
boolean isEmpty() {
return events.isEmpty();
}
@Override
public Iterator<Event<T>> iterator() {
return new Iterator<Event<T>>() {
Iterator<Event<T>> it = events.iterator();
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Event<T> next() {
return it.next();
}
@Override
public void remove() {
it.remove();
size.decrementAndGet();
setModified();
}
};
}
public int size() {
return size.get();
}
public long getId() {
return id;
}
// for unit tests
public Collection<Event<T>> getEvents() {
return Collections.unmodifiableCollection(events);
}
@Override
public String toString() {
return "WindowPartition{id=" + id + ", size=" + size + '}';
}
}
}