blob: a026f0e8757b4f404dde8aeb7f5c9feb398a16de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto
.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication
.OnDemandContainerReplicationSource;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
/**
* Ozone main class sets up the network servers and initializes the container
* layer.
*/
public class OzoneContainer {
private static final Logger LOG = LoggerFactory.getLogger(
OzoneContainer.class);
private final HddsDispatcher hddsDispatcher;
private final Map<ContainerType, Handler> handlers;
private final OzoneConfiguration config;
private final VolumeSet volumeSet;
private final ContainerSet containerSet;
private final XceiverServerSpi writeChannel;
private final XceiverServerSpi readChannel;
private final ContainerController controller;
private ContainerMetadataScanner metadataScanner;
private List<ContainerDataScanner> dataScanners;
private final BlockDeletingService blockDeletingService;
/**
* Construct OzoneContainer object.
* @param datanodeDetails
* @param conf
* @param certClient
* @throws DiskOutOfSpaceException
* @throws IOException
*/
public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
conf, StateContext context, CertificateClient certClient)
throws IOException {
this.config = conf;
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
this.containerSet = new ContainerSet();
this.metadataScanner = null;
buildContainerSet();
final ContainerMetrics metrics = ContainerMetrics.create(conf);
this.handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, context, containerSet, volumeSet, metrics));
}
this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
handlers, context, metrics);
/*
* ContainerController is the control plane
* XceiverServerRatis is the write channel
* XceiverServerGrpc is the read channel
*/
this.controller = new ContainerController(containerSet, handlers);
this.writeChannel = XceiverServerRatis.newXceiverServerRatis(
datanodeDetails, config, hddsDispatcher, controller, certClient,
context);
this.readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient,
createReplicationService());
long svcInterval = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
long serviceTimeout = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.blockDeletingService =
new BlockDeletingService(this, svcInterval, serviceTimeout,
TimeUnit.MILLISECONDS, config);
}
private GrpcReplicationService createReplicationService() {
return new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller));
}
/**
* Build's container map.
*/
private void buildContainerSet() {
Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
.iterator();
ArrayList<Thread> volumeThreads = new ArrayList<Thread>();
//TODO: diskchecker should be run before this, to see how disks are.
// And also handle disk failure tolerance need to be added
while (volumeSetIterator.hasNext()) {
HddsVolume volume = volumeSetIterator.next();
Thread thread = new Thread(new ContainerReader(volumeSet, volume,
containerSet, config));
thread.start();
volumeThreads.add(thread);
}
try {
for (int i = 0; i < volumeThreads.size(); i++) {
volumeThreads.get(i).join();
}
} catch (InterruptedException ex) {
LOG.info("Volume Threads Interrupted exception", ex);
}
}
/**
* Start background daemon thread for performing container integrity checks.
*/
private void startContainerScrub() {
ContainerScrubberConfiguration c = config.getObject(
ContainerScrubberConfiguration.class);
boolean enabled = c.isEnabled();
if (!enabled) {
LOG.info("Background container scanner has been disabled.");
} else {
if (this.metadataScanner == null) {
this.metadataScanner = new ContainerMetadataScanner(c, controller);
}
this.metadataScanner.start();
dataScanners = new ArrayList<>();
for (HddsVolume v : volumeSet.getVolumesList()) {
ContainerDataScanner s = new ContainerDataScanner(c, controller, v);
s.start();
dataScanners.add(s);
}
}
}
/**
* Stop the scanner thread and wait for thread to die.
*/
private void stopContainerScrub() {
if (metadataScanner == null) {
return;
}
metadataScanner.shutdown();
metadataScanner = null;
for (ContainerDataScanner s : dataScanners) {
s.shutdown();
}
}
/**
* Starts serving requests to ozone container.
*
* @throws IOException
*/
public void start(String scmId) throws IOException {
LOG.info("Attempting to start container services.");
startContainerScrub();
writeChannel.start();
readChannel.start();
hddsDispatcher.init();
hddsDispatcher.setScmId(scmId);
blockDeletingService.start();
}
/**
* Stop Container Service on the datanode.
*/
public void stop() {
//TODO: at end of container IO integration work.
LOG.info("Attempting to stop container services.");
stopContainerScrub();
writeChannel.stop();
readChannel.stop();
this.handlers.values().forEach(Handler::stop);
hddsDispatcher.shutdown();
volumeSet.shutdown();
blockDeletingService.shutdown();
ContainerMetrics.remove();
}
@VisibleForTesting
public ContainerSet getContainerSet() {
return containerSet;
}
/**
* Returns container report.
* @return - container report.
*/
public PipelineReportsProto getPipelineReport() {
PipelineReportsProto.Builder pipelineReportsProto =
PipelineReportsProto.newBuilder();
pipelineReportsProto.addAllPipelineReport(writeChannel.getPipelineReport());
return pipelineReportsProto.build();
}
public XceiverServerSpi getWriteChannel() {
return writeChannel;
}
public XceiverServerSpi getReadChannel() {
return readChannel;
}
public ContainerController getController() {
return controller;
}
/**
* Returns node report of container storage usage.
*/
public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport()
throws IOException {
return volumeSet.getNodeReport();
}
@VisibleForTesting
public ContainerDispatcher getDispatcher() {
return this.hddsDispatcher;
}
public VolumeSet getVolumeSet() {
return volumeSet;
}
}