blob: 60053aa2eb89569bcc0b815a356d1b53bba545c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
* sends output to zero, one, or more output topics.
* <p>
* The computational logic can be specified either by using the {@link Topology} to define a DAG topology of
* {@link Processor}s or by using the {@link StreamsBuilder} which provides the high-level DSL to define
* transformations.
* <p>
* One {@code KafkaStreams} instance can contain one or more threads specified in the configs for the processing work.
* <p>
* A {@code KafkaStreams} instance can co-ordinate with any other instances with the same
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} (whether in the same process, on other processes on this
* machine, or on remote machines) as a single (possibly distributed) stream processing application.
* These instances will divide up the work based on the assignment of the input topic partitions so that all partitions
* are being consumed.
* If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves
* to balance processing load and ensure that all input topic partitions are processed.
* <p>
* Internally a {@code KafkaStreams} instance contains a normal {@link KafkaProducer} and {@link KafkaConsumer} instance
* that is used for reading input and writing output.
* <p>
* A simple example might look like this:
* <pre>{@code
* Properties props = new Properties();
* props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
* props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
* props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
* props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
*
* StreamsBuilder builder = new StreamsBuilder();
* builder.<String, String>stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic");
*
* KafkaStreams streams = new KafkaStreams(builder.build(), props);
* streams.start();
* }</pre>
*
* @see org.apache.kafka.streams.StreamsBuilder
* @see org.apache.kafka.streams.Topology
*/
@InterfaceStability.Evolving
public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
private final Time time;
private final Logger log;
private final String clientId;
private final Metrics metrics;
private final StreamsConfig config;
protected final StreamThread[] threads;
private final StateDirectory stateDirectory;
private final StreamsMetadataState streamsMetadataState;
private final ScheduledExecutorService stateDirCleaner;
private final QueryableStoreProvider queryableStoreProvider;
private final AdminClient adminClient;
private GlobalStreamThread globalStreamThread;
private KafkaStreams.StateListener stateListener;
private StateRestoreListener globalStateRestoreListener;
// container states
/**
* Kafka Streams states are the possible state that a Kafka Streams instance can be in.
* An instance must only be in one state at a time.
* The expected state transition with the following defined states is:
*
* <pre>
* +--------------+
* +<----- | Created (0) |
* | +-----+--------+
* | |
* | v
* | +----+--+------+
* | | Re- |
* +<----- | Balancing (1)| -------->+
* | +-----+-+------+ |
* | | ^ |
* | v | |
* | +--------------+ v
* | | Running (2) | -------->+
* | +------+-------+ |
* | | |
* | v |
* | +------+-------+ +----+-------+
* +-----> | Pending |<--- | Error (5) |
* | Shutdown (3) | +------------+
* +------+-------+
* |
* v
* +------+-------+
* | Not |
* | Running (4) |
* +--------------+
*
*
* </pre>
* Note the following:
* - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED state
* - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
* - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
* - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
* the instance will be in the ERROR state. The user will need to close it.
*/
public enum State {
CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
private final Set<Integer> validTransitions = new HashSet<>();
State(final Integer... validTransitions) {
this.validTransitions.addAll(Arrays.asList(validTransitions));
}
public boolean isRunning() {
return equals(RUNNING) || equals(REBALANCING);
}
public boolean isValidTransition(final State newState) {
return validTransitions.contains(newState.ordinal());
}
}
private final Object stateLock = new Object();
protected volatile State state = State.CREATED;
private boolean waitOnState(final State targetState, final long waitMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
}
} else {
log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
return false;
}
elapsedMs = time.milliseconds() - begin;
}
return true;
}
}
/**
* Sets the state
* @param newState New state
*/
private boolean setState(final State newState) {
final State oldState;
synchronized (stateLock) {
oldState = state;
if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) {
// when the state is already in PENDING_SHUTDOWN, all other transitions than NOT_RUNNING (due to thread dying) will be
// refused but we do not throw exception here, to allow appropriate error handling
return false;
} else if (state == State.NOT_RUNNING && (newState == State.PENDING_SHUTDOWN || newState == State.NOT_RUNNING)) {
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls)
// will be refused but we do not throw exception here, to allow idempotent close calls
return false;
} else if (state == State.REBALANCING && newState == State.REBALANCING) {
// when the state is already in REBALANCING, it should not transit to REBALANCING again
return false;
} else if (state == State.ERROR && newState == State.ERROR) {
// when the state is already in ERROR, it should not transit to ERROR again
return false;
} else if (!state.isValidTransition(newState)) {
throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState);
} else {
log.info("State transition from {} to {}", oldState, newState);
}
state = newState;
stateLock.notifyAll();
}
// we need to call the user customized state listener outside the state lock to avoid potential deadlocks
if (stateListener != null) {
stateListener.onChange(newState, oldState);
}
return true;
}
/**
* Return the current {@link State} of this {@code KafkaStreams} instance.
*
* @return the current state of this Kafka Streams instance
*/
public State state() {
return state;
}
private boolean isRunning() {
synchronized (stateLock) {
return state.isRunning();
}
}
private void validateIsRunning() {
if (!isRunning()) {
throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
}
}
/**
* Listen to {@link State} change events.
*/
public interface StateListener {
/**
* Called when state changes.
*
* @param newState new state
* @param oldState previous state
*/
void onChange(final State newState, final State oldState);
}
/**
* An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes.
*
* @param listener a new state listener
* @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/
public void setStateListener(final KafkaStreams.StateListener listener) {
synchronized (stateLock) {
if (state == State.CREATED) {
stateListener = listener;
} else {
throw new IllegalStateException("Can only set StateListener in CREATED state. Current state is: " + state);
}
}
}
/**
* Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
* terminates due to an uncaught exception.
*
* @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler
* @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/
public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
synchronized (stateLock) {
if (state == State.CREATED) {
for (final StreamThread thread : threads) {
thread.setUncaughtExceptionHandler(eh);
}
if (globalStreamThread != null) {
globalStreamThread.setUncaughtExceptionHandler(eh);
}
} else {
throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " +
"Current state is: " + state);
}
}
}
/**
* Set the listener which is triggered whenever a {@link StateStore} is being restored in order to resume
* processing.
*
* @param globalStateRestoreListener The listener triggered when {@link StateStore} is being restored.
* @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/
public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) {
synchronized (stateLock) {
if (state == State.CREATED) {
this.globalStateRestoreListener = globalStateRestoreListener;
} else {
throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " +
"Current state is: " + state);
}
}
}
/**
* Get read-only handle on global metrics registry, including streams client's own metrics plus
* its embedded producer, consumer and admin clients' metrics.
*
* @return Map of all metrics.
*/
public Map<MetricName, ? extends Metric> metrics() {
final Map<MetricName, Metric> result = new LinkedHashMap<>();
// producer and consumer clients are per-thread
for (final StreamThread thread : threads) {
result.putAll(thread.producerMetrics());
result.putAll(thread.consumerMetrics());
// admin client is shared, so we can actually move it
// to result.putAll(adminClient.metrics()).
// we did it intentionally just for flexibility.
result.putAll(thread.adminClientMetrics());
}
// global thread's consumer client
if (globalStreamThread != null) {
result.putAll(globalStreamThread.consumerMetrics());
}
// self streams metrics
result.putAll(metrics.metrics());
return Collections.unmodifiableMap(result);
}
/**
* Class that handles stream thread transitions
*/
final class StreamStateListener implements StreamThread.StateListener {
private final Map<Long, StreamThread.State> threadState;
private GlobalStreamThread.State globalThreadState;
// this lock should always be held before the state lock
private final Object threadStatesLock;
StreamStateListener(final Map<Long, StreamThread.State> threadState,
final GlobalStreamThread.State globalThreadState) {
this.threadState = threadState;
this.globalThreadState = globalThreadState;
this.threadStatesLock = new Object();
}
/**
* If all threads are dead set to ERROR
*/
private void maybeSetError() {
// check if we have at least one thread running
for (final StreamThread.State state : threadState.values()) {
if (state != StreamThread.State.DEAD) {
return;
}
}
if (setState(State.ERROR)) {
log.error("All stream threads have died. The instance will be in error state and should be closed.");
}
}
/**
* If all threads are up, including the global thread, set to RUNNING
*/
private void maybeSetRunning() {
// state can be transferred to RUNNING if all threads are either RUNNING or DEAD
for (final StreamThread.State state : threadState.values()) {
if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD) {
return;
}
}
// the global state thread is relevant only if it is started. There are cases
// when we don't have a global state thread at all, e.g., when we don't have global KTables
if (globalThreadState != null && globalThreadState != GlobalStreamThread.State.RUNNING) {
return;
}
setState(State.RUNNING);
}
@Override
public synchronized void onChange(final Thread thread,
final ThreadStateTransitionValidator abstractNewState,
final ThreadStateTransitionValidator abstractOldState) {
synchronized (threadStatesLock) {
// StreamThreads first
if (thread instanceof StreamThread) {
final StreamThread.State newState = (StreamThread.State) abstractNewState;
threadState.put(thread.getId(), newState);
if (newState == StreamThread.State.PARTITIONS_REVOKED) {
setState(State.REBALANCING);
} else if (newState == StreamThread.State.RUNNING) {
maybeSetRunning();
} else if (newState == StreamThread.State.DEAD) {
maybeSetError();
}
} else if (thread instanceof GlobalStreamThread) {
// global stream thread has different invariants
final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
globalThreadState = newState;
// special case when global thread is dead
if (newState == GlobalStreamThread.State.DEAD) {
if (setState(State.ERROR)) {
log.error("Global thread has died. The instance will be in error state and should be closed.");
}
}
}
}
}
}
final class DelegatingStateRestoreListener implements StateRestoreListener {
private void throwOnFatalException(final Exception fatalUserException,
final TopicPartition topicPartition,
final String storeName) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
@Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
final long endingOffset) {
if (globalStateRestoreListener != null) {
try {
globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
} catch (final Exception fatalUserException) {
throwOnFatalException(fatalUserException, topicPartition, storeName);
}
}
}
@Override
public void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored) {
if (globalStateRestoreListener != null) {
try {
globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
} catch (final Exception fatalUserException) {
throwOnFatalException(fatalUserException, topicPartition, storeName);
}
}
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
if (globalStateRestoreListener != null) {
try {
globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
} catch (final Exception fatalUserException) {
throwOnFatalException(fatalUserException, topicPartition, storeName);
}
}
}
}
/**
* Create a {@code KafkaStreams} instance.
* <p>
* Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param props properties for {@link StreamsConfig}
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final Properties props) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
}
/**
* Create a {@code KafkaStreams} instance.
* <p>
* Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param props properties for {@link StreamsConfig}
* @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
* for the new {@code KafkaStreams} instance
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
}
/**
* Create a {@code KafkaStreams} instance.
* <p>
* Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param props properties for {@link StreamsConfig}
* @param time {@code Time} implementation; cannot be null
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final Properties props,
final Time time) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(), time);
}
/**
* Create a {@code KafkaStreams} instance.
* <p>
* Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
* you still must {@link #close()} it to avoid resource leaks.
*
* @param topology the topology specifying the computational logic
* @param props properties for {@link StreamsConfig}
* @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
* for the new {@code KafkaStreams} instance
* @param time {@code Time} implementation; cannot be null
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final Properties props,
final KafkaClientSupplier clientSupplier,
final Time time) {
this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, time);
}
/**
* @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
*/
@Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig config) {
this(topology, config, new DefaultKafkaClientSupplier());
}
/**
* @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
*/
@Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) {
this(topology.internalTopologyBuilder, config, clientSupplier);
}
/**
* @deprecated use {@link #KafkaStreams(Topology, Properties, Time)} instead
*/
@Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig config,
final Time time) {
this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time);
}
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
}
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this.config = config;
this.time = time;
// The application ID is a required config and hence should always have value
final UUID processId = UUID.randomUUID();
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
clientId = applicationId + "-" + processId;
} else {
clientId = userClientId;
}
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);
// re-write the physical topology according to the config
internalTopologyBuilder.rewriteTopology(config);
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
// create the stream thread, global update thread, and cleanup thread
threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
if (totalCacheSize < 0) {
totalCacheSize = 0;
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
try {
stateDirectory = new StateDirectory(config, time, createStateDirectory);
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
if (globalTaskTopology != null) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
metrics,
time,
globalThreadId,
delegatingStateRestoreListener);
globalThreadState = globalStreamThread.state();
}
// use client id instead of thread client id since this admin client may be shared among threads
adminClient = clientSupplier.getAdminClient(config.getAdminConfigs(StreamThread.getSharedAdminClientId(clientId)));
final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = StreamThread.create(internalTopologyBuilder,
config,
clientSupplier,
adminClient,
processId,
clientId,
metrics,
time,
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
if (globalTaskTopology != null) {
globalStreamThread.setStateListener(streamStateListener);
}
for (final StreamThread thread : threads) {
thread.setStateListener(streamStateListener);
}
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
stateDirCleaner = Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, clientId + "-CleanupThread");
thread.setDaemon(true);
return thread;
});
}
private static HostInfo parseHostInfo(final String endPoint) {
if (endPoint == null || endPoint.trim().isEmpty()) {
return StreamsMetadataState.UNKNOWN_HOST;
}
final String host = getHost(endPoint);
final Integer port = getPort(endPoint);
if (host == null || port == null) {
throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
}
return new HostInfo(host, port);
}
/**
* Start the {@code KafkaStreams} instance by starting all its threads.
* This function is expected to be called only once during the life cycle of the client.
* <p>
* Because threads are started in the background, this method does not block.
* However, if you have global stores in your topology, this method blocks until all global stores are restored.
* As a consequence, any fatal exception that happens during processing is by default only logged.
* If you want to be notified about dying threads, you can
* {@link #setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler) register an uncaught exception handler}
* before starting the {@code KafkaStreams} instance.
* <p>
* Note, for brokers with version {@code 0.9.x} or lower, the broker version cannot be checked.
* There will be no error and the client will hang and retry to verify the broker version until it
* {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
* @throws IllegalStateException if process was already started
* @throws StreamsException if the Kafka brokers have version 0.10.0.x or
* if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers
*/
public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
log.debug("Starting Streams client");
if (globalStreamThread != null) {
globalStreamThread.start();
}
for (final StreamThread thread : threads) {
thread.start();
}
final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
stateDirCleaner.scheduleAtFixedRate(() -> {
// we do not use lock here since we only read on the value and act on it
if (state == State.RUNNING) {
stateDirectory.cleanRemovedTasks(cleanupDelay);
}
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
} else {
throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
}
}
/**
* Shutdown this {@code KafkaStreams} instance by signaling all the threads to stop, and then wait for them to join.
* This will block until all threads have stopped.
*/
public void close() {
close(Long.MAX_VALUE);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately.
* @param timeUnit unit of time used for timeout
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
long timeoutMs = timeUnit.toMillis(timeout);
log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " +
"Please, consider update your code.", timeoutMs);
if (timeoutMs < 0) {
timeoutMs = 0;
} else if (timeoutMs == 0) {
timeoutMs = Long.MAX_VALUE;
}
return close(timeoutMs);
}
private boolean close(final long timeoutMs) {
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been stopped
log.info("Already in the pending shutdown state, wait to complete shutdown");
} else {
stateDirCleaner.shutdownNow();
// wait for all threads to join in a separate thread;
// save the current thread so that if it is a stream thread
// we don't attempt to join it and cause a deadlock
final Thread shutdownThread = new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
for (final StreamThread thread : threads) {
thread.shutdown();
}
for (final StreamThread thread : threads) {
try {
if (!thread.isRunning()) {
thread.join();
}
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
if (globalStreamThread != null) {
globalStreamThread.shutdown();
}
if (globalStreamThread != null && !globalStreamThread.stillRunning()) {
try {
globalStreamThread.join();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
globalStreamThread = null;
}
adminClient.close();
metrics.close();
setState(State.NOT_RUNNING);
}, "kafka-streams-close-thread");
shutdownThread.setDaemon(true);
shutdownThread.start();
}
if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
log.info("Streams client stopped completely");
return true;
} else {
log.info("Streams client cannot stop completely within the timeout");
return false;
}
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
final long timeoutMs = ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
return close(timeoutMs);
}
/**
* Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
* data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
* <p>
* May only be called either before this {@code KafkaStreams} instance is {@link #start() started} or after the
* instance is {@link #close() closed}.
* <p>
* Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}.
*
* @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running}
* @throws StreamsException if cleanup failed
*/
public void cleanUp() {
if (isRunning()) {
throw new IllegalStateException("Cannot clean up while running.");
}
stateDirectory.clean();
}
/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> allMetadata() {
validateIsRunning();
return streamsMetadataState.getAllMetadata();
}
/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that
* <ul>
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
* instances that belong to the same Kafka Streams application)</li>
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
* </ul>
* and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
*/
public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
validateIsRunning();
return streamsMetadataState.getAllMetadataForStore(storeName);
}
/**
* Find the currently running {@code KafkaStreams} instance (potentially remotely) that
* <ul>
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
* instances that belong to the same Kafka Streams application)</li>
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
* <li>and the {@link StateStore} contains the given {@code key}</li>
* </ul>
* and return {@link StreamsMetadata} for it.
* <p>
* This will use the default Kafka Streams partitioner to locate the partition.
* If a {@link StreamPartitioner custom partitioner} has been
* {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
* {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
* {@link StreamsBuilder#table(String) topic} is partitioned differently, please use
* {@link #metadataForKey(String, Object, StreamPartitioner)}.
* <p>
* Note:
* <ul>
* <li>this is a point in time view and it may change due to partition reassignment</li>
* <li>the key may not exist in the {@link StateStore}; this method provides a way of finding which host it
* <em>would</em> exist on</li>
* <li>if this is for a window store the serializer should be the serializer for the record key,
* not the window serializer</li>
* </ul>
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param keySerializer serializer for the key
* @param <K> key type
* @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and
* {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing
*/
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
validateIsRunning();
return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
}
/**
* Find the currently running {@code KafkaStreams} instance (potentially remotely) that
* <ul>
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
* instances that belong to the same Kafka Streams application)</li>
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
* <li>and the {@link StateStore} contains the given {@code key}</li>
* </ul>
* and return {@link StreamsMetadata} for it.
* <p>
* Note:
* <ul>
* <li>this is a point in time view and it may change due to partition reassignment</li>
* <li>the key may not exist in the {@link StateStore}; this method provides a way of finding which host it
* <em>would</em> exist on</li>
* </ul>
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param partitioner the partitioner to be use to locate the host for the key
* @param <K> key type
* @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and
* {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing
*/
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunning();
return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
}
/**
* Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
* type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
* The returned object can be used to query the {@link StateStore} instances.
*
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param <T> return type
* @return A facade wrapping the local {@link StateStore} instances
* @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
* {@code queryableStoreType} doesn't exist
*/
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
}
/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
validateIsRunning();
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
for (final StreamThread thread : threads) {
threadMetadata.add(thread.threadMetadata());
}
return threadMetadata;
}
}