blob: b9e131380b50e218c5a8bddbbf351aa2a6c28dd8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.scm;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import com.google.inject.Inject;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Recon's 'lite' version of SCM.
*/
public class ReconStorageContainerManagerFacade
implements OzoneStorageContainerManager {
private static final Logger LOG = LoggerFactory
.getLogger(ReconStorageContainerManagerFacade.class);
private final OzoneConfiguration ozoneConfiguration;
private final ReconDatanodeProtocolServer datanodeProtocolServer;
private final EventQueue eventQueue;
private final SCMStorageConfig scmStorageConfig;
private final DBStore dbStore;
private ReconNodeManager nodeManager;
private ReconPipelineManager pipelineManager;
private ReconContainerManager containerManager;
private NetworkTopology clusterMap;
private StorageContainerServiceProvider scmServiceProvider;
private Set<ReconScmTask> reconScmTasks = new HashSet<>();
private SCMContainerPlacementMetrics placementMetrics;
private PlacementPolicy containerPlacementPolicy;
private HDDSLayoutVersionManager scmLayoutVersionManager;
@Inject
public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
StorageContainerServiceProvider scmServiceProvider,
ReconTaskStatusDao reconTaskStatusDao,
ContainerHealthSchemaManager containerHealthSchemaManager,
ContainerDBServiceProvider containerDBServiceProvider)
throws IOException {
this.eventQueue = new EventQueue();
eventQueue.setSilent(true);
this.ozoneConfiguration = getReconScmConfiguration(conf);
this.scmStorageConfig = new ReconStorageConfig(conf);
this.clusterMap = new NetworkTopologyImpl(conf);
dbStore = DBStoreBuilder
.createDBStore(ozoneConfiguration, new ReconSCMDBDefinition());
this.scmLayoutVersionManager =
new HDDSLayoutVersionManager(scmStorageConfig);
this.nodeManager =
new ReconNodeManager(conf, scmStorageConfig, eventQueue, clusterMap,
ReconSCMDBDefinition.NODES.getTable(dbStore),
this.scmLayoutVersionManager);
placementMetrics = SCMContainerPlacementMetrics.create();
this.containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, nodeManager,
clusterMap, true, placementMetrics);
this.datanodeProtocolServer = new ReconDatanodeProtocolServer(
conf, this, eventQueue);
this.pipelineManager =
new ReconPipelineManager(conf,
nodeManager,
ReconSCMDBDefinition.PIPELINES.getTable(dbStore),
eventQueue);
this.containerManager = new ReconContainerManager(conf,
ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
dbStore, pipelineManager, scmServiceProvider,
containerHealthSchemaManager, containerDBServiceProvider);
this.scmServiceProvider = scmServiceProvider;
NodeReportHandler nodeReportHandler =
new NodeReportHandler(nodeManager);
SafeModeManager safeModeManager = new ReconSafeModeManager();
ReconPipelineReportHandler pipelineReportHandler =
new ReconPipelineReportHandler(
safeModeManager, pipelineManager, conf, scmServiceProvider);
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, conf);
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(nodeManager, pipelineManager, conf);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(nodeManager,
pipelineManager, containerManager);
ContainerReportHandler containerReportHandler =
new ReconContainerReportHandler(nodeManager, containerManager);
IncrementalContainerReportHandler icrHandler =
new ReconIncrementalContainerReportHandler(nodeManager,
containerManager);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, containerManager);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
ReconNewNodeHandler newNodeHandler = new ReconNewNodeHandler(nodeManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, nodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT, icrHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
reconScmTasks.add(new PipelineSyncTask(
pipelineManager,
scmServiceProvider,
reconTaskStatusDao,
reconTaskConfig));
reconScmTasks.add(new ContainerHealthTask(
containerManager,
reconTaskStatusDao, containerHealthSchemaManager,
containerPlacementPolicy,
reconTaskConfig));
}
/**
* For every config key which is prefixed by 'recon.scmconfig', create a new
* config key without the prefix keeping the same value.
* For example, if recon.scm.a.b. = xyz, we add a new config like
* a.b.c = xyz. This is done to override Recon's passive SCM configs if
* needed.
* @param configuration configuration object.
* @return same configuration object with possible added elements.
*/
private OzoneConfiguration getReconScmConfiguration(
OzoneConfiguration configuration) {
OzoneConfiguration reconScmConfiguration =
new OzoneConfiguration(configuration);
Map<String, String> reconScmConfigs =
configuration.getPropsWithPrefix(RECON_SCM_CONFIG_PREFIX);
for (Map.Entry<String, String> entry : reconScmConfigs.entrySet()) {
reconScmConfiguration.set(entry.getKey(), entry.getValue());
}
return reconScmConfiguration;
}
/**
* Start the Recon SCM subsystems.
*/
public void start() {
if (LOG.isInfoEnabled()) {
LOG.info(buildRpcServerStartMessage(
"Recon ScmDatanodeProtocol RPC server",
getDatanodeProtocolServer().getDatanodeRpcAddress()));
}
initializePipelinesFromScm();
getDatanodeProtocolServer().start();
this.reconScmTasks.forEach(ReconScmTask::start);
}
/**
* Wait until service has completed shutdown.
*/
public void join() {
try {
getDatanodeProtocolServer().join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join.");
}
}
/**
* Stop the Recon SCM subsystems.
*/
public void stop() {
getDatanodeProtocolServer().stop();
reconScmTasks.forEach(ReconScmTask::stop);
try {
LOG.info("Stopping SCM Event Queue.");
eventQueue.close();
} catch (Exception ex) {
LOG.error("SCM Event Queue stop failed", ex);
}
IOUtils.cleanupWithLogger(LOG, nodeManager);
IOUtils.cleanupWithLogger(LOG, containerManager);
IOUtils.cleanupWithLogger(LOG, pipelineManager);
LOG.info("Flushing container replica history to DB.");
containerManager.flushReplicaHistoryMapToDB(true);
try {
dbStore.close();
} catch (Exception e) {
LOG.error("Can't close dbStore ", e);
}
}
public ReconDatanodeProtocolServer getDatanodeProtocolServer() {
return datanodeProtocolServer;
}
private void initializePipelinesFromScm() {
try {
List<Pipeline> pipelinesFromScm = scmServiceProvider.getPipelines();
LOG.info("Obtained {} pipelines from SCM.", pipelinesFromScm.size());
pipelineManager.initializePipelines(pipelinesFromScm);
} catch (IOException ioEx) {
LOG.error("Exception encountered while getting pipelines from SCM.",
ioEx);
}
}
@Override
public NodeManager getScmNodeManager() {
return nodeManager;
}
@Override
public BlockManager getScmBlockManager() {
return null;
}
@Override
public PipelineManager getPipelineManager() {
return pipelineManager;
}
@Override
public ContainerManager getContainerManager() {
return containerManager;
}
@Override
public ReplicationManager getReplicationManager() {
return null;
}
@Override
public InetSocketAddress getDatanodeRpcAddress() {
return getDatanodeProtocolServer().getDatanodeRpcAddress();
}
public EventQueue getEventQueue() {
return eventQueue;
}
}