blob: f70db21665f987b73f5969e0601f6f9bc325eef5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.topology;
import static org.apache.storm.windowing.persistence.WindowState.WindowPartition;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.State;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.DefaultEvictionContext;
import org.apache.storm.windowing.EventImpl;
import org.apache.storm.windowing.WindowLifecycleListener;
import org.apache.storm.windowing.persistence.WindowState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying checkpointing mechanisms to save the
* tuples in window to state. The tuples are also kept in-memory by transparently caching the window partitions and checkpointing them as
* needed.
*/
public class PersistentWindowedBoltExecutor<T extends State> extends WindowedBoltExecutor implements IStatefulBolt<T> {
private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
private final IStatefulWindowedBolt<T> statefulWindowedBolt;
private transient OutputCollector outputCollector;
private transient WindowState<Tuple> state;
private transient boolean stateInitialized;
private transient boolean prePrepared;
private transient KeyValueState<String, Optional<?>> windowSystemState;
public PersistentWindowedBoltExecutor(IStatefulWindowedBolt<T> bolt) {
super(bolt);
statefulWindowedBolt = bolt;
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
List<String> registrations = (List<String>) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
registrations.add(ConcurrentLinkedQueue.class.getName());
registrations.add(LinkedList.class.getName());
registrations.add(AtomicInteger.class.getName());
registrations.add(EventImpl.class.getName());
registrations.add(WindowPartition.class.getName());
registrations.add(DefaultEvictionContext.class.getName());
topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
prepare(topoConf, context, collector, getWindowState(topoConf, context), getPartitionState(topoConf, context),
getWindowSystemState(topoConf, context));
}
// package access for unit tests
void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
KeyValueState<Long, WindowPartition<Tuple>> windowState,
KeyValueState<String, Deque<Long>> partitionState,
KeyValueState<String, Optional<?>> windowSystemState) {
outputCollector = collector;
this.windowSystemState = windowSystemState;
state = new WindowState<>(windowState, partitionState, windowSystemState, this::getState,
statefulWindowedBolt.maxEventsInMemory());
doPrepare(topoConf, context, new NoAckOutputCollector(collector), state, true);
restoreWindowSystemState();
}
private void restoreWindowSystemState() {
Map<String, Optional<?>> map = new HashMap<>();
for (Map.Entry<String, Optional<?>> entry : windowSystemState) {
map.put(entry.getKey(), entry.getValue());
}
restoreState(map);
}
@Override
protected void validate(Map<String, Object> topoConf,
BaseWindowedBolt.Count windowLengthCount,
BaseWindowedBolt.Duration windowLengthDuration,
BaseWindowedBolt.Count slidingIntervalCount,
BaseWindowedBolt.Duration slidingIntervalDuration) {
if (windowLengthCount == null && windowLengthDuration == null) {
throw new IllegalArgumentException("Window length is not specified");
}
int interval = getCheckpointIntervalMillis(topoConf);
int timeout = getTopologyTimeoutMillis(topoConf);
if (interval > timeout) {
throw new IllegalArgumentException(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL + interval
+ " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
+ " value " + timeout);
}
}
private int getCheckpointIntervalMillis(Map<String, Object> topoConf) {
int checkpointInterval = Integer.MAX_VALUE;
if (topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL) != null) {
checkpointInterval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
}
return checkpointInterval;
}
@Override
protected void start() {
if (stateInitialized) {
super.start();
} else {
LOG.debug("Will invoke start after state is initialized");
}
}
@Override
public void execute(Tuple input) {
if (!stateInitialized) {
throw new IllegalStateException("execute invoked before initState with input tuple " + input);
}
super.execute(input);
// StatefulBoltExecutor does the actual ack when the state is saved.
outputCollector.ack(input);
}
@Override
public void initState(T state) {
if (stateInitialized) {
String msg = "initState invoked when the state is already initialized";
LOG.warn(msg);
throw new IllegalStateException(msg);
} else {
statefulWindowedBolt.initState(state);
stateInitialized = true;
start();
}
}
@Override
public void prePrepare(long txid) {
if (stateInitialized) {
LOG.debug("Prepare streamState, txid {}", txid);
statefulWindowedBolt.prePrepare(txid);
state.prepareCommit(txid);
prePrepared = true;
} else {
String msg = "Cannot prepare before initState";
LOG.warn(msg);
throw new IllegalStateException(msg);
}
}
@Override
public void preCommit(long txid) {
// preCommit can be invoked during recovery before the state is initialized
if (prePrepared || !stateInitialized) {
LOG.debug("Commit streamState, txid {}", txid);
statefulWindowedBolt.preCommit(txid);
state.commit(txid);
} else {
String msg = "preCommit before prePrepare in initialized state";
LOG.warn(msg);
throw new IllegalStateException(msg);
}
}
@Override
public void preRollback() {
LOG.debug("Rollback streamState, stateInitialized {}", stateInitialized);
statefulWindowedBolt.preRollback();
state.rollback(stateInitialized);
if (stateInitialized) {
restoreWindowSystemState();
}
}
@Override
protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
return new WindowLifecycleListener<Tuple>() {
@Override
public void onExpiry(List<Tuple> events) {
/*
* NO-OP: the events are ack-ed in execute
*/
}
@Override
public void onActivation(Supplier<Iterator<Tuple>> eventsIt,
Supplier<Iterator<Tuple>> newEventsIt,
Supplier<Iterator<Tuple>> expiredIt,
Long timestamp) {
/*
* Here we don't set the tuples in windowedOutputCollector's context and emit un-anchored.
* The checkpoint tuple will trigger a checkpoint in the receiver with the emitted tuples.
*/
boltExecute(eventsIt, newEventsIt, expiredIt, timestamp);
state.clearIteratorPins();
}
};
}
private KeyValueState<Long, WindowPartition<Tuple>> getWindowState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window";
return (KeyValueState<Long, WindowPartition<Tuple>>) StateFactory.getState(namespace, topoConf, context);
}
private KeyValueState<String, Deque<Long>> getPartitionState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-partitions";
return (KeyValueState<String, Deque<Long>>) StateFactory.getState(namespace, topoConf, context);
}
private KeyValueState<String, Optional<?>> getWindowSystemState(Map<String, Object> topoConf, TopologyContext context) {
String namespace = context.getThisComponentId() + "-" + context.getThisTaskId() + "-window-systemstate";
return (KeyValueState<String, Optional<?>>) StateFactory.getState(namespace, topoConf, context);
}
/**
* Creates an {@link OutputCollector} wrapper that ignores acks. The {@link PersistentWindowedBoltExecutor} acks the tuples in execute
* and this is to prevent double ack-ing
*/
private static class NoAckOutputCollector extends OutputCollector {
public NoAckOutputCollector(OutputCollector delegate) {
super(delegate);
}
@Override
public void ack(Tuple input) {
// NOOP
}
}
}