blob: 91c49c303f066423af096ae1098dc06628a890a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import javax.management.ObjectName;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
/**
* Implements api needed for management of pipelines. All the write operations
* for pipelines must come via PipelineManager. It synchronises all write
* and read operations via a ReadWriteLock.
*/
public class SCMPipelineManager implements PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMPipelineManager.class);
private final ReadWriteLock lock;
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private final BackgroundPipelineCreator backgroundPipelineCreator;
private Scheduler scheduler;
private final EventPublisher eventPublisher;
private final NodeManager nodeManager;
private final SCMPipelineMetrics metrics;
private final ConfigurationSource conf;
private long pipelineWaitDefaultTimeout;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
private Table<PipelineID, Pipeline> pipelineStore;
private final AtomicBoolean isInSafeMode;
// Used to track if the safemode pre-checks have completed. This is designed
// to prevent pipelines being created until sufficient nodes have registered.
private final AtomicBoolean pipelineCreationAllowed;
// This allows for freezing/resuming the new pipeline creation while the
// SCM is already out of SafeMode.
private AtomicBoolean freezePipelineCreation;
public SCMPipelineManager(ConfigurationSource conf,
NodeManager nodeManager,
Table<PipelineID, Pipeline> pipelineStore,
EventPublisher eventPublisher)
throws IOException {
this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
this.stateManager = new PipelineStateManager();
this.pipelineFactory = new PipelineFactory(nodeManager,
stateManager, conf, eventPublisher);
this.pipelineStore = pipelineStore;
initializePipelineState();
}
protected SCMPipelineManager(ConfigurationSource conf,
NodeManager nodeManager,
Table<PipelineID, Pipeline> pipelineStore,
EventPublisher eventPublisher,
PipelineStateManager pipelineStateManager,
PipelineFactory pipelineFactory)
throws IOException {
this.lock = new ReentrantReadWriteLock();
this.pipelineStore = pipelineStore;
this.conf = conf;
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
this.eventPublisher = eventPublisher;
this.nodeManager = nodeManager;
this.metrics = SCMPipelineMetrics.create();
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
// Pipeline creation is only allowed after the safemode prechecks have
// passed, eg sufficient nodes have registered.
this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
// controls freezing/resuming pipeline creation regardless of SafeMode
// status.
this.freezePipelineCreation = new AtomicBoolean(false);
}
public PipelineStateManager getStateManager() {
return stateManager;
}
@VisibleForTesting
public void setPipelineProvider(ReplicationType replicationType,
PipelineProvider provider) {
pipelineFactory.setProvider(replicationType, provider);
}
@VisibleForTesting
public void allowPipelineCreation() {
this.pipelineCreationAllowed.set(true);
}
@VisibleForTesting
public boolean isPipelineCreationAllowed() {
return pipelineCreationAllowed.get();
}
protected void initializePipelineState() throws IOException {
if (pipelineStore.isEmpty()) {
LOG.info("No pipeline exists in current db");
return;
}
TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
iterator = pipelineStore.iterator();
while (iterator.hasNext()) {
Pipeline pipeline = nextPipelineFromIterator(iterator);
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
}
}
private Pipeline nextPipelineFromIterator(
TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it
) throws IOException {
KeyValue<PipelineID, Pipeline> actual = it.next();
Pipeline pipeline = actual.getValue();
PipelineID pipelineID = actual.getKey();
checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID);
return pipeline;
}
/**
* This method is part of the change that happens in HDDS-3925, and we can
* and should remove this on later on.
* The purpose of the change is to get rid of protobuf serialization in the
* SCM database Pipeline table keys. The keys are not used anywhere, and the
* PipelineID that is used as a key is in the value as well, so we can detect
* a change in the key translation to byte[] and if we have the old format
* we refresh the table contents during SCM startup.
*
* If this fails in the remove, then there is an IOException coming from
* RocksDB itself, in this case in memory structures will still be fine and
* SCM should be operational, however we will attempt to replace the old key
* at next startup. In this case removing of the pipeline will leave the
* pipeline in RocksDB, and during next startup we will attempt to delete it
* again. This does not affect any runtime operations.
* If a Pipeline should have been deleted but remained in RocksDB, then at
* next startup it will be replaced and added with the new key, then SCM will
* detect that it is an invalid Pipeline and successfully delete it with the
* new key.
* For further info check the JIRA.
*
* @param it the iterator used to iterate the Pipeline table
* @param pipeline the pipeline read already from the iterator
* @param pipelineID the pipeline ID read from the raw data via the iterator
*/
private void checkKeyAndReplaceIfObsolete(
TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it,
Pipeline pipeline,
PipelineID pipelineID
) {
if (!pipelineID.equals(pipeline.getId())) {
try {
LOG.info("Found pipeline in old format key : {}", pipeline.getId());
it.removeFromDB();
pipelineStore.put(pipeline.getId(), pipeline);
} catch (IOException e) {
LOG.info("Pipeline table in RocksDB has an old key format, and "
+ "removing the pipeline with the old key was unsuccessful."
+ "Pipeline: {}", pipeline);
}
}
}
private void recordMetricsForPipeline(Pipeline pipeline) {
metrics.incNumPipelineAllocated();
if (pipeline.isOpen()) {
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
}
switch (pipeline.getType()) {
case STAND_ALONE:
return;
case RATIS:
List<Pipeline> overlapPipelines = RatisPipelineUtils
.checkPipelineContainSameDatanodes(stateManager, pipeline);
if (!overlapPipelines.isEmpty()) {
// Count 1 overlap at a time.
metrics.incNumPipelineContainSameDatanodes();
//TODO remove until pipeline allocation is proved equally distributed.
for (Pipeline overlapPipeline : overlapPipelines) {
LOG.info("Pipeline: " + pipeline.getId().toString() +
" contains same datanodes as previous pipelines: " +
overlapPipeline.getId().toString() + " nodeIds: " +
pipeline.getNodes().get(0).getUuid().toString() +
", " + pipeline.getNodes().get(1).getUuid().toString() +
", " + pipeline.getNodes().get(2).getUuid().toString());
}
}
return;
case CHAINED:
// Not supported.
default:
// Not supported.
return;
}
}
@Override
public Pipeline createPipeline(ReplicationType type,
ReplicationFactor factor) throws IOException {
if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
"complete");
throw new IOException("Pipeline creation is not allowed as safe mode " +
"prechecks have not yet passed");
}
if (freezePipelineCreation.get()) {
LOG.debug("Pipeline creation is frozen while an upgrade is in " +
"progress");
throw new IOException("Pipeline creation is frozen while an upgrade " +
"is in progress");
}
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
if (pipelineStore != null) {
pipelineStore.put(pipeline.getId(), pipeline);
}
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
recordMetricsForPipeline(pipeline);
return pipeline;
} catch (IOException ex) {
if (ex instanceof SCMException &&
((SCMException) ex).getResult() == FAILED_TO_FIND_SUITABLE_NODE) {
// Avoid spam SCM log with errors when SCM has enough open pipelines
LOG.debug("Can't create more pipelines of type {} and factor {}. " +
"Reason: {}", type, factor, ex.getMessage());
} else {
LOG.error("Failed to create pipeline of type {} and factor {}. " +
"Exception: {}", type, factor, ex.getMessage());
}
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
lock.writeLock().unlock();
}
}
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
try {
return pipelineFactory.create(type, factor, nodes);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Pipeline getPipeline(PipelineID pipelineID)
throws PipelineNotFoundException {
lock.readLock().lock();
try {
return stateManager.getPipeline(pipelineID);
} finally {
lock.readLock().unlock();
}
}
@Override
public boolean containsPipeline(PipelineID pipelineID) {
lock.readLock().lock();
try {
getPipeline(pipelineID);
return true;
} catch (PipelineNotFoundException e) {
return false;
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines() {
lock.readLock().lock();
try {
return stateManager.getPipelines();
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, factor);
} finally {
lock.readLock().unlock();
}
}
public List<Pipeline> getPipelines(ReplicationType type,
Pipeline.PipelineState state) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, state);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, factor, state);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state,
Collection<DatanodeDetails> excludeDns,
Collection<PipelineID> excludePipelines) {
lock.readLock().lock();
try {
return stateManager
.getPipelines(type, factor, state, excludeDns, excludePipelines);
} finally {
lock.readLock().unlock();
}
}
@Override
public void addContainerToPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.addContainerToPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
}
private void updatePipelineStateInDb(PipelineID pipelineId,
Pipeline.PipelineState state)
throws IOException {
// null check is here to prevent the case where SCM store
// is closed but the staleNode handlers/pipleine creations
// still try to access it.
if (pipelineStore != null) {
try {
pipelineStore.put(pipelineId, getPipeline(pipelineId));
} catch (IOException ex) {
LOG.info("Pipeline {} state update failed", pipelineId);
// revert back to old state in memory
stateManager.updatePipelineState(pipelineId, state);
}
}
}
@Override
public void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.removeContainerFromPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
}
@Override
public NavigableSet<ContainerID> getContainersInPipeline(
PipelineID pipelineID) throws IOException {
lock.readLock().lock();
try {
return stateManager.getContainers(pipelineID);
} finally {
lock.readLock().unlock();
}
}
@Override
public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
return stateManager.getNumberOfContainers(pipelineID);
}
@Override
public void openPipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineId).getPipelineState();
Pipeline pipeline = stateManager.openPipeline(pipelineId);
updatePipelineStateInDb(pipelineId, state);
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
} finally {
lock.writeLock().unlock();
}
}
/**
* Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
* destroy pipeline on the datanodes immediately or after timeout based on the
* value of onTimeout parameter.
*
* @param pipeline - Pipeline to be destroyed
* @param onTimeout - if true pipeline is removed and destroyed on
* datanodes after timeout
* @throws IOException
*/
@Override
public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
LOG.info("Destroying pipeline:{}", pipeline);
finalizePipeline(pipeline.getId());
if (onTimeout) {
long pipelineDestroyTimeoutInMillis =
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
scheduler.schedule(() -> destroyPipeline(pipeline),
pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
String.format("Destroy pipeline failed for pipeline:%s", pipeline));
} else {
destroyPipeline(pipeline);
}
}
@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException{
if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
// Only srub pipeline for RATIS THREE pipeline
return;
}
Instant currentTime = Instant.now();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
Pipeline.PipelineState.ALLOCATED).stream()
.filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)
.collect(Collectors.toList());
for (Pipeline p : needToSrubPipelines) {
LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
" since it stays at ALLOCATED stage for " +
Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
" mins.");
finalizeAndDestroyPipeline(p, false);
}
}
@Override
public Map<String, Integer> getPipelineInfo() {
final Map<String, Integer> pipelineInfo = new HashMap<>();
for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
pipelineInfo.put(state.toString(), 0);
}
stateManager.getPipelines().forEach(pipeline ->
pipelineInfo.computeIfPresent(
pipeline.getPipelineState().toString(), (k, v) -> v + 1));
return pipelineInfo;
}
/**
* Schedules a fixed interval job to create pipelines.
*/
@Override
public void startPipelineCreator() {
backgroundPipelineCreator.startFixedIntervalPipelineCreator();
}
/**
* Triggers pipeline creation after the specified time.
*/
@Override
public void triggerPipelineCreation() {
backgroundPipelineCreator.triggerPipelineCreation();
}
/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineID).getPipelineState();
stateManager.activatePipeline(pipelineID);
updatePipelineStateInDb(pipelineID, state);
}
/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineID).getPipelineState();
stateManager.deactivatePipeline(pipelineID);
updatePipelineStateInDb(pipelineID, state);
}
/**
* Wait a pipeline to be OPEN.
*
* @param pipelineID ID of the pipeline to wait for.
* @param timeout wait timeout, millisecond, 0 to use default value
* @throws IOException in case of any Exception, such as timeout
*/
@Override
public void waitPipelineReady(PipelineID pipelineID, long timeout)
throws IOException {
long st = Time.monotonicNow();
if (timeout == 0) {
timeout = pipelineWaitDefaultTimeout;
}
boolean ready;
Pipeline pipeline;
do {
try {
pipeline = stateManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
throw new PipelineNotFoundException(String.format(
"Pipeline %s cannot be found", pipelineID));
}
ready = pipeline.isOpen();
if (!ready) {
try {
Thread.sleep((long)100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} while (!ready && Time.monotonicNow() - st < timeout);
if (!ready) {
throw new IOException(String.format("Pipeline %s is not ready in %d ms",
pipelineID, timeout));
}
}
/**
* Moves the pipeline to CLOSED state and sends close container command for
* all the containers in the pipeline.
*
* @param pipelineId - ID of the pipeline to be moved to CLOSED state.
* @throws IOException
*/
private void finalizePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline.PipelineState state = stateManager.
getPipeline(pipelineId).getPipelineState();
stateManager.finalizePipeline(pipelineId);
updatePipelineStateInDb(pipelineId, state);
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
for (ContainerID containerID : containerIDs) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
}
metrics.removePipelineMetrics(pipelineId);
} finally {
lock.writeLock().unlock();
}
}
/**
* Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
* the datanodes for ratis pipelines.
*
* @param pipeline - Pipeline to be destroyed
* @throws IOException
*/
protected void destroyPipeline(Pipeline pipeline) throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
// remove the pipeline from the pipeline manager
removePipeline(pipeline.getId());
triggerPipelineCreation();
}
/**
* Removes the pipeline from the db and pipeline state map.
*
* @param pipelineId - ID of the pipeline to be removed
* @throws IOException
*/
protected void removePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
if (pipelineStore != null) {
pipelineStore.delete(pipelineId);
Pipeline pipeline = stateManager.removePipeline(pipelineId);
nodeManager.removePipeline(pipeline);
metrics.incNumPipelineDestroyed();
}
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
lock.writeLock().unlock();
}
}
@Override
public void incNumBlocksAllocatedMetric(PipelineID id) {
metrics.incNumBlocksAllocated(id);
}
@Override
public void close() throws IOException {
if (scheduler != null) {
scheduler.close();
scheduler = null;
}
if(pmInfoBean != null) {
MBeans.unregister(this.pmInfoBean);
pmInfoBean = null;
}
SCMPipelineMetrics.unRegister();
// shutdown pipeline provider.
pipelineFactory.shutdown();
lock.writeLock().lock();
try {
pipelineStore.close();
pipelineStore = null;
} catch (Exception ex) {
LOG.error("Pipeline store close failed", ex);
} finally {
lock.writeLock().unlock();
}
}
/**
* returns min number of healthy volumes from the set of
* datanodes constituting the pipeline.
* @param pipeline
* @return healthy volume count
*/
public int minHealthyVolumeNum(Pipeline pipeline) {
return nodeManager.minHealthyVolumeNum(pipeline.getNodes());
}
/**
* returns max count of raft log volumes from the set of
* datanodes constituting the pipeline.
* @param pipeline
* @return healthy volume count
*/
public int minPipelineLimit(Pipeline pipeline) {
return nodeManager.minPipelineLimit(pipeline.getNodes());
}
protected ReadWriteLock getLock() {
return lock;
}
@VisibleForTesting
public PipelineFactory getPipelineFactory() {
return pipelineFactory;
}
protected NodeManager getNodeManager() {
return nodeManager;
}
@Override
public boolean getSafeModeStatus() {
return this.isInSafeMode.get();
}
@Override
public void freezePipelineCreation() {
freezePipelineCreation.set(true);
backgroundPipelineCreator.pause();
}
@Override
public void resumePipelineCreation() {
freezePipelineCreation.set(false);
backgroundPipelineCreator.resume();
}
public Table<PipelineID, Pipeline> getPipelineStore() {
return pipelineStore;
}
@Override
public void onMessage(SafeModeStatus status,
EventPublisher publisher) {
// TODO: #CLUTIL - handle safemode getting re-enabled
boolean currentAllowPipelines =
pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
boolean currentlyInSafeMode =
isInSafeMode.getAndSet(status.isInSafeMode());
// Trigger pipeline creation only if the preCheck status has changed to
// complete.
if (isPipelineCreationAllowed() && !currentAllowPipelines) {
triggerPipelineCreation();
}
// Start the pipeline creation thread only when safemode switches off
if (!getSafeModeStatus() && currentlyInSafeMode) {
startPipelineCreator();
}
}
@VisibleForTesting
protected static Logger getLog() {
return LOG;
}
}