blob: 0989d34fc21dc015b6e0936037b299b2f8675364 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataStore;
import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
/**
* Implements api needed for management of pipelines. All the write operations
* for pipelines must come via PipelineManager. It synchronises all write
* and read operations via a ReadWriteLock.
*/
public class SCMPipelineManager implements PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMPipelineManager.class);
private final ReadWriteLock lock;
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private final BackgroundPipelineCreator backgroundPipelineCreator;
private Scheduler scheduler;
private MetadataStore pipelineStore;
private final EventPublisher eventPublisher;
private final NodeManager nodeManager;
private final SCMPipelineMetrics metrics;
private final Configuration conf;
private long pipelineWaitDefaultTimeout;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
private final AtomicBoolean isInSafeMode;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher)
throws IOException {
this(conf, nodeManager, eventPublisher, null, null);
this.stateManager = new PipelineStateManager();
this.pipelineFactory = new PipelineFactory(nodeManager,
stateManager, conf, eventPublisher);
initializePipelineState();
}
protected SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher,
PipelineStateManager pipelineStateManager,
PipelineFactory pipelineFactory)
throws IOException {
this.lock = new ReentrantReadWriteLock();
this.conf = conf;
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
final File pipelineDBPath = getPipelineDBPath(conf);
this.pipelineStore =
MetadataStoreBuilder.newBuilder()
.setCreateIfMissing(true)
.setConf(conf)
.setDbFile(pipelineDBPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
this.eventPublisher = eventPublisher;
this.nodeManager = nodeManager;
this.metrics = SCMPipelineMetrics.create();
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
}
public PipelineStateManager getStateManager() {
return stateManager;
}
@VisibleForTesting
public void setPipelineProvider(ReplicationType replicationType,
PipelineProvider provider) {
pipelineFactory.setProvider(replicationType, provider);
}
protected void initializePipelineState() throws IOException {
if (pipelineStore.isEmpty()) {
LOG.info("No pipeline exists in current db");
return;
}
List<Map.Entry<byte[], byte[]>> pipelines =
pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE,
(MetadataKeyFilters.MetadataKeyFilter[])null);
for (Map.Entry<byte[], byte[]> entry : pipelines) {
HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
.newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
// When SCM is restarted, set Creation time with current time.
pipeline.setCreationTimestamp(Instant.now());
Preconditions.checkNotNull(pipeline);
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
}
}
private void recordMetricsForPipeline(Pipeline pipeline) {
metrics.incNumPipelineAllocated();
if (pipeline.isOpen()) {
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
}
switch (pipeline.getType()) {
case STAND_ALONE:
return;
case RATIS:
List<Pipeline> overlapPipelines = RatisPipelineUtils
.checkPipelineContainSameDatanodes(stateManager, pipeline);
if (!overlapPipelines.isEmpty()) {
// Count 1 overlap at a time.
metrics.incNumPipelineContainSameDatanodes();
//TODO remove until pipeline allocation is proved equally distributed.
for (Pipeline overlapPipeline : overlapPipelines) {
LOG.info("Pipeline: " + pipeline.getId().toString() +
" contains same datanodes as previous pipelines: " +
overlapPipeline.getId().toString() + " nodeIds: " +
pipeline.getNodes().get(0).getUuid().toString() +
", " + pipeline.getNodes().get(1).getUuid().toString() +
", " + pipeline.getNodes().get(2).getUuid().toString());
}
}
return;
case CHAINED:
// Not supported.
default:
// Not supported.
return;
}
}
@Override
public synchronized Pipeline createPipeline(ReplicationType type,
ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
recordMetricsForPipeline(pipeline);
return pipeline;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
lock.writeLock().unlock();
}
}
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
try {
return pipelineFactory.create(type, factor, nodes);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Pipeline getPipeline(PipelineID pipelineID)
throws PipelineNotFoundException {
lock.readLock().lock();
try {
return stateManager.getPipeline(pipelineID);
} finally {
lock.readLock().unlock();
}
}
@Override
public boolean containsPipeline(PipelineID pipelineID) {
lock.readLock().lock();
try {
getPipeline(pipelineID);
return true;
} catch (PipelineNotFoundException e) {
return false;
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines() {
lock.readLock().lock();
try {
return stateManager.getPipelines();
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, factor);
} finally {
lock.readLock().unlock();
}
}
public List<Pipeline> getPipelines(ReplicationType type,
Pipeline.PipelineState state) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, state);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state) {
lock.readLock().lock();
try {
return stateManager.getPipelines(type, factor, state);
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state,
Collection<DatanodeDetails> excludeDns,
Collection<PipelineID> excludePipelines) {
lock.readLock().lock();
try {
return stateManager
.getPipelines(type, factor, state, excludeDns, excludePipelines);
} finally {
lock.readLock().unlock();
}
}
@Override
public void addContainerToPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.addContainerToPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.removeContainerFromPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
}
@Override
public NavigableSet<ContainerID> getContainersInPipeline(
PipelineID pipelineID) throws IOException {
lock.readLock().lock();
try {
return stateManager.getContainers(pipelineID);
} finally {
lock.readLock().unlock();
}
}
@Override
public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
return stateManager.getNumberOfContainers(pipelineID);
}
@Override
public void openPipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline = stateManager.openPipeline(pipelineId);
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
} finally {
lock.writeLock().unlock();
}
}
/**
* Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
* destroy pipeline on the datanodes immediately or after timeout based on the
* value of onTimeout parameter.
*
* @param pipeline - Pipeline to be destroyed
* @param onTimeout - if true pipeline is removed and destroyed on
* datanodes after timeout
* @throws IOException
*/
@Override
public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
LOG.info("Destroying pipeline:{}", pipeline);
finalizePipeline(pipeline.getId());
if (onTimeout) {
long pipelineDestroyTimeoutInMillis =
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
scheduler.schedule(() -> destroyPipeline(pipeline),
pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
String.format("Destroy pipeline failed for pipeline:%s", pipeline));
} else {
destroyPipeline(pipeline);
}
}
@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException{
if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
// Only srub pipeline for RATIS THREE pipeline
return;
}
Instant currentTime = Instant.now();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
Pipeline.PipelineState.ALLOCATED).stream()
.filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)
.collect(Collectors.toList());
for (Pipeline p : needToSrubPipelines) {
LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
" since it stays at ALLOCATED stage for " +
Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
" mins.");
finalizeAndDestroyPipeline(p, false);
}
}
@Override
public Map<String, Integer> getPipelineInfo() {
final Map<String, Integer> pipelineInfo = new HashMap<>();
for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
pipelineInfo.put(state.toString(), 0);
}
stateManager.getPipelines().forEach(pipeline ->
pipelineInfo.computeIfPresent(
pipeline.getPipelineState().toString(), (k, v) -> v + 1));
return pipelineInfo;
}
/**
* Schedules a fixed interval job to create pipelines.
*/
@Override
public void startPipelineCreator() {
backgroundPipelineCreator.startFixedIntervalPipelineCreator();
}
/**
* Triggers pipeline creation after the specified time.
*/
@Override
public void triggerPipelineCreation() {
backgroundPipelineCreator.triggerPipelineCreation();
}
/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
stateManager.activatePipeline(pipelineID);
}
/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
stateManager.deactivatePipeline(pipelineID);
}
/**
* Wait a pipeline to be OPEN.
*
* @param pipelineID ID of the pipeline to wait for.
* @param timeout wait timeout, millisecond, 0 to use default value
* @throws IOException in case of any Exception, such as timeout
*/
@Override
public void waitPipelineReady(PipelineID pipelineID, long timeout)
throws IOException {
long st = Time.monotonicNow();
if (timeout == 0) {
timeout = pipelineWaitDefaultTimeout;
}
boolean ready;
Pipeline pipeline;
do {
try {
pipeline = stateManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
throw new PipelineNotFoundException(String.format(
"Pipeline %s cannot be found", pipelineID));
}
ready = pipeline.isOpen();
if (!ready) {
try {
Thread.sleep((long)100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} while (!ready && Time.monotonicNow() - st < timeout);
if (!ready) {
throw new IOException(String.format("Pipeline %s is not ready in %d ms",
pipelineID, timeout));
}
}
/**
* Moves the pipeline to CLOSED state and sends close container command for
* all the containers in the pipeline.
*
* @param pipelineId - ID of the pipeline to be moved to CLOSED state.
* @throws IOException
*/
private void finalizePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
stateManager.finalizePipeline(pipelineId);
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
for (ContainerID containerID : containerIDs) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
}
metrics.removePipelineMetrics(pipelineId);
} finally {
lock.writeLock().unlock();
}
}
/**
* Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
* the datanodes for ratis pipelines.
*
* @param pipeline - Pipeline to be destroyed
* @throws IOException
*/
protected void destroyPipeline(Pipeline pipeline) throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
// remove the pipeline from the pipeline manager
removePipeline(pipeline.getId());
triggerPipelineCreation();
}
/**
* Removes the pipeline from the db and pipeline state map.
*
* @param pipelineId - ID of the pipeline to be removed
* @throws IOException
*/
protected void removePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
pipelineStore.delete(pipelineId.getProtobuf().toByteArray());
Pipeline pipeline = stateManager.removePipeline(pipelineId);
nodeManager.removePipeline(pipeline);
metrics.incNumPipelineDestroyed();
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
lock.writeLock().unlock();
}
}
@Override
public void incNumBlocksAllocatedMetric(PipelineID id) {
metrics.incNumBlocksAllocated(id);
}
@Override
public void close() throws IOException {
if (scheduler != null) {
scheduler.close();
scheduler = null;
}
if (pipelineStore != null) {
pipelineStore.close();
pipelineStore = null;
}
if(pmInfoBean != null) {
MBeans.unregister(this.pmInfoBean);
pmInfoBean = null;
}
if(metrics != null) {
metrics.unRegister();
}
// shutdown pipeline provider.
pipelineFactory.shutdown();
}
protected File getPipelineDBPath(Configuration configuration) {
File metaDir = ServerUtils.getScmDbDir(configuration);
return new File(metaDir, SCM_PIPELINE_DB);
}
protected ReadWriteLock getLock() {
return lock;
}
@VisibleForTesting
public PipelineFactory getPipelineFactory() {
return pipelineFactory;
}
protected MetadataStore getPipelineStore() {
return pipelineStore;
}
protected NodeManager getNodeManager() {
return nodeManager;
}
@Override
public void setSafeModeStatus(boolean safeModeStatus) {
this.isInSafeMode.set(safeModeStatus);
}
@Override
public boolean getSafeModeStatus() {
return this.isInSafeMode.get();
}
}