blob: 3c86bfb5a722d11c1ca688747f8787d83e695410 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
/**
* A single-thread based monitor that periodically monitors the given set of stream regexes, and matches them to
* the given set of streams. If a stream matching a given regex that is not in the corresponding stream set is detected,
* it invokes a {@link StreamRegexMonitor.Callback} with the initial input set, the new input stream set, and the regexes
* being monitored.
*/
public class StreamRegexMonitor {
private static final Logger log = LoggerFactory.getLogger(StreamRegexMonitor.class);
// Factory of daemon-threads to create the single threaded executor pool
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Samza-" + StreamRegexMonitor.class.getSimpleName())
.build();
// Enum to describe the state of the regexMonitor
private enum State {
INIT, RUNNING, STOPPED
}
private final Set<SystemStream> streamsToMonitor;
private final Map<String, Pattern> systemRegexesToMonitor;
private final StreamMetadataCache metadataCache;
private final int inputRegexMonitorPeriodMs;
// Map of gauges (one per system), emitted when new input stream for that system is detected
private final Map<String, Gauge<Integer>> gauges;
private final Callback callbackMethod;
// Used to guard write access to state.
private final Object lock = new Object();
private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
private volatile State state = State.INIT;
/**
* A callback that is invoked when the {@link StreamRegexMonitor} detects a new input stream matching given regex.
*/
public interface Callback {
/**
* Method to be called when new input streams are detected.
* @param initialInputSet The initial set of input streams
* @param newInputStreams The set of new input streams discovered
* @param regexesMonitored The set of regexes being monitored
*/
void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
Map<String, Pattern> regexesMonitored);
}
/**
* Default constructor.
*
* @param streamsToMonitor a set of SystemStreams to monitor
* @param systemRegexesToMonitor map of regexes for each input system
* @param metadataCache the metadata cache which will be used to fetch metadata for partition counts.
* @param metrics the metrics registry to which the metrics should be added.
* @param inputRegexMonitorPeriodMs the period at which the monitor will check each input-regex
* @param monitorCallback the callback method to be invoked when new input stream matching regex is detected
*/
public StreamRegexMonitor(Set<SystemStream> streamsToMonitor, Map<String, Pattern> systemRegexesToMonitor,
StreamMetadataCache metadataCache, MetricsRegistry metrics, int inputRegexMonitorPeriodMs,
Callback monitorCallback) {
this.streamsToMonitor = streamsToMonitor;
this.systemRegexesToMonitor = systemRegexesToMonitor;
this.metadataCache = metadataCache;
this.callbackMethod = monitorCallback;
this.inputRegexMonitorPeriodMs = inputRegexMonitorPeriodMs;
// Pre-populate the gauges
Map<String, Gauge<Integer>> mutableGauges = new HashMap<>();
for (String systemToMonitor : systemRegexesToMonitor.keySet()) {
Gauge gauge = metrics.newGauge("job-coordinator", String.format("%s-new-input-streams", systemToMonitor), 0);
mutableGauges.put(systemToMonitor, gauge);
}
gauges = Collections.unmodifiableMap(mutableGauges);
log.info("Created {} with inputRegexMonitorPeriodMs: {} and systemRegexesToMonitor: {}", this.getClass().getName(),
this.inputRegexMonitorPeriodMs, this.systemRegexesToMonitor);
}
/**
* Starts the monitor.
*/
public void start() {
synchronized (lock) {
switch (state) {
case INIT:
if (inputRegexMonitorPeriodMs > 0) {
schedulerService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
monitorInputRegexes();
}
}, 0, inputRegexMonitorPeriodMs, TimeUnit.MILLISECONDS);
}
state = State.RUNNING;
break;
case RUNNING:
// start is idempotent
return;
case STOPPED:
throw new IllegalStateException("StreamRegexMonitor was stopped and cannot be restarted.");
}
}
}
/**
* Stops the monitor. Once it stops, it cannot be restarted.
*/
public void stop() {
synchronized (lock) {
// We could also wait for full termination of the scheduler service, but it is overkill for
// our use case.
schedulerService.shutdownNow();
state = State.STOPPED;
}
}
private void monitorInputRegexes() {
log.debug("Running monitorInputRegexes");
try {
// obtain the list of SysStreams that match given patterns for all systems
Set<SystemStream> inputStreamsMatchingPattern = new HashSet<>();
// For each input system, for which we have a regex to monitor
for (String systemName : this.systemRegexesToMonitor.keySet()) {
try {
// obtain the list of SysStreams that match the regex for this system
// using the systemAdmin in the metadataCache
inputStreamsMatchingPattern.addAll(
JavaConverters.setAsJavaSetConverter(this.metadataCache.getAllSystemStreams(systemName))
.asJava()
.stream()
.filter(x -> x.getStream().matches(this.systemRegexesToMonitor.get(systemName).pattern()))
.collect(Collectors.toSet()));
} catch (UnsupportedOperationException e) {
log.error("UnsupportedOperationException while monitoring input regexes for system {}", systemName, e);
}
}
// if there is a stream that is in the input-Set but not in the streamsToMonitor
// since streamsToMonitor = task.inputs
if (!streamsToMonitor.containsAll(inputStreamsMatchingPattern)) {
log.info("New input system-streams discovered. InputStreamsMatchingPattern: {} but streamsToMonitor: {} ",
inputStreamsMatchingPattern, streamsToMonitor);
// invoke notify callback with new input streams
this.callbackMethod.onInputStreamsChanged(streamsToMonitor,
Sets.difference(inputStreamsMatchingPattern, streamsToMonitor), systemRegexesToMonitor);
} else {
log.info("No new input system-Streams discovered streamsToMonitor {} inputStreamsMatchingPattern {}",
streamsToMonitor, inputStreamsMatchingPattern);
}
} catch (Exception e) {
log.error("Exception while monitoring input regexes.", e);
}
}
@VisibleForTesting
boolean isRunning() {
return state == State.RUNNING;
}
/**
* Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
* and false otherwise.
* <p>
* This is currently exposed at the package private level for tests only.
*/
@VisibleForTesting
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return schedulerService.awaitTermination(timeout, unit);
}
}