blob: b46fc3f8fa4b68d667894323fbaffdb67075bcb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.drain;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.metadatastore.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DrainMonitor is intended to monitor the MetadataStore for {@link DrainNotification} and invoke
* the {@link DrainCallback}.
* */
public class DrainMonitor {
private static final Logger LOG = LoggerFactory.getLogger(DrainMonitor.class);
/**
* Describes the state of the monitor.
* */
public enum State {
/**
* Initial state when DrainMonitor is not polling for DrainNotifications.
* */
INIT,
/**
* When Drain Monitor is started, it moves from INIT to RUNNING state and starts polling
* for Drain Notifications.
* */
RUNNING,
/**
* Indicates that the Drain Monitor is stopped. The DrainMonitor could have been explicitly stopped or stopped on
* its own if a DrainNotification was encountered.
* */
STOPPED
}
private static final int INITIAL_POLL_DELAY_MILLIS = 0;
private final ScheduledExecutorService schedulerService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("Samza DrainMonitor Thread-%d")
.setDaemon(true)
.build());
private final String appRunId;
private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
// Used to guard write access to state.
private final Object lock = new Object();
@GuardedBy("lock")
private State state = State.INIT;
private DrainCallback callback;
private long pollingIntervalMillis;
public DrainMonitor(MetadataStore metadataStore, Config config) {
Preconditions.checkNotNull(metadataStore, "MetadataStore parameter cannot be null.");
Preconditions.checkNotNull(config, "Config parameter cannot be null.");
this.drainMetadataStore =
new NamespaceAwareCoordinatorStreamStore(metadataStore, DrainUtils.DRAIN_METADATA_STORE_NAMESPACE);
ApplicationConfig applicationConfig = new ApplicationConfig(config);
this.appRunId = applicationConfig.getRunId();
JobConfig jobConfig = new JobConfig(config);
this.pollingIntervalMillis = jobConfig.getDrainMonitorPollIntervalMillis();
}
public DrainMonitor(MetadataStore metadataStore, Config config, long pollingIntervalMillis) {
this(metadataStore, config);
Preconditions.checkArgument(pollingIntervalMillis > 0,
String.format("Polling interval specified is %d ms. It should be greater than 0.", pollingIntervalMillis));
this.pollingIntervalMillis = pollingIntervalMillis;
}
/**
* Starts the DrainMonitor.
* */
public void start() {
Preconditions.checkState(callback != null,
"Drain Callback needs to be set using registerCallback(callback) prior to starting the DrainManager.");
synchronized (lock) {
switch (state) {
case INIT:
if (shouldDrain(drainMetadataStore, appRunId)) {
/*
* Prior to starting the periodic polling, we are doing a one-time check on the calling(container main) thread
* to see if DrainNotification is present in the metadata store for the current deployment.
* This check is to deal with the case where a container might have re-started during Drain.
* If yes, we will set the container to drain mode to prevent it from processing any new messages. This will
* in-turn guarantee that intermediate Drain control messages from the previous incarnation of the container are
* processed and there are no duplicate intermediate control messages for the same deployment.
* */
LOG.info("Found DrainNotification message on container start. Skipping poll of DrainNotifications.");
callback.onDrain();
} else {
state = State.RUNNING;
schedulerService.scheduleAtFixedRate(() -> {
if (shouldDrain(drainMetadataStore, appRunId)) {
LOG.info("Received Drain Notification for deployment: {}", appRunId);
stop();
callback.onDrain();
}
}, INITIAL_POLL_DELAY_MILLIS, pollingIntervalMillis, TimeUnit.MILLISECONDS);
LOG.info("Started DrainMonitor.");
}
break;
case RUNNING:
case STOPPED:
LOG.info("Cannot call start() on the DrainMonitor when it is in {} state.", state);
break;
}
}
}
/**
* Stops the DrainMonitor.
* */
public void stop() {
synchronized (lock) {
switch (state) {
case RUNNING:
schedulerService.shutdownNow();
state = State.STOPPED;
LOG.info("Stopped DrainMonitor.");
break;
case INIT:
case STOPPED:
LOG.info("Cannot stop DrainMonitor as it is not running. State: {}.", state);
break;
}
}
}
/**
* Register a callback to be executed when DrainNotification is encountered.
*
* @param callback the callback to register.
* @return Returns {@code true} if registration was successful and {@code false} if not.
* Registration can fail it the DrainMonitor is stopped or a callback is already registered.
* */
public boolean registerDrainCallback(DrainCallback callback) {
Preconditions.checkNotNull(callback);
switch (state) {
case RUNNING:
case STOPPED:
LOG.warn("Cannot register callback when it is in {} state. Please register callback before calling start "
+ "on DrainMonitor.", state);
return false;
case INIT:
if (this.callback != null) {
LOG.warn("Cannot register callback as a callback is already registered.");
return false;
}
this.callback = callback;
return true;
default:
return false;
}
}
/**
* Get the current state of the DrainMonitor.
* */
@VisibleForTesting
State getState() {
return state;
}
/**
* Callback for any action to executed by DrainMonitor implementations once Drain is encountered.
* Registered using {@link #registerDrainCallback(DrainCallback)}.
* */
public interface DrainCallback {
void onDrain();
}
/**
* One time check check to see if there are any DrainNotification messages available in the
* metadata store for the current deployment.
* */
static boolean shouldDrain(NamespaceAwareCoordinatorStreamStore drainMetadataStore, String runId) {
final Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(drainMetadataStore);
if (drainNotifications.isPresent()) {
final ImmutableList<DrainNotification> filteredDrainNotifications = drainNotifications.get()
.stream()
.filter(notification -> runId.equals(notification.getRunId()))
.collect(ImmutableList.toImmutableList());
return !filteredDrainNotifications.isEmpty();
}
return false;
}
/**
* Reads all DrainNotification messages from the metadata store.
* */
private static Optional<List<DrainNotification>> readDrainNotificationMessages(NamespaceAwareCoordinatorStreamStore
drainMetadataStore) {
final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
final ImmutableList<DrainNotification> drainNotifications = drainMetadataStore.all()
.values()
.stream()
.map(bytes -> {
try {
return objectMapper.readValue(bytes, DrainNotification.class);
} catch (IOException e) {
LOG.error("Unable to deserialize DrainNotification from the metadata store", e);
throw new SamzaException(e);
}
})
.collect(ImmutableList.toImmutableList());
return drainNotifications.size() > 0
? Optional.of(drainNotifications)
: Optional.empty();
}
}