blob: 8b8eff4797d63d841ceeb386f744554564c39c22 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.protobuf.BlockingService;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.security.KerberosInfo;
import org.bouncycastle.cert.X509CertificateHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
/**
* The protocol used to perform security related operations with SCM.
*/
@KerberosInfo(
serverPrincipal = ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
@InterfaceAudience.Private
public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
private static final Logger LOGGER = LoggerFactory
.getLogger(SCMSecurityProtocolServer.class);
private final CertificateServer certificateServer;
private final RPC.Server rpcServer;
private final InetSocketAddress rpcAddress;
private final ProtocolMessageMetrics metrics;
SCMSecurityProtocolServer(OzoneConfiguration conf,
CertificateServer certificateServer) throws IOException {
this.certificateServer = certificateServer;
final int handlerCount =
conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_DEFAULT);
rpcAddress = HddsServerUtil
.getScmSecurityInetAddress(conf);
// SCM security service RPC service.
RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
ProtobufRpcEngine.class);
metrics = new ProtocolMessageMetrics("ScmSecurityProtocol",
"SCM Security protocol metrics",
SCMSecurityProtocolProtos.Type.values());
BlockingService secureProtoPbService =
SCMSecurityProtocolProtos.SCMSecurityProtocolService
.newReflectiveBlockingService(
new SCMSecurityProtocolServerSideTranslatorPB(this, metrics));
this.rpcServer =
StorageContainerManager.startRpcServer(
conf,
rpcAddress,
SCMSecurityProtocolPB.class,
secureProtoPbService,
handlerCount);
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
rpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
}
}
/**
* Get SCM signed certificate for DataNode.
*
* @param dnDetails - DataNode Details.
* @param certSignReq - Certificate signing request.
* @return String - SCM signed pem encoded certificate.
*/
@Override
public String getDataNodeCertificate(
DatanodeDetailsProto dnDetails,
String certSignReq) throws IOException {
LOGGER.info("Processing CSR for dn {}, UUID: {}", dnDetails.getHostName(),
dnDetails.getUuid());
Objects.requireNonNull(dnDetails);
Future<X509CertificateHolder> future =
certificateServer.requestCertificate(certSignReq,
KERBEROS_TRUSTED);
try {
return CertificateCodec.getPEMEncodedString(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("getDataNodeCertificate operation failed. ", e);
} catch (ExecutionException e) {
throw new IOException("getDataNodeCertificate operation failed. ", e);
}
}
/**
* Get SCM signed certificate for OM.
*
* @param omDetails - OzoneManager Details.
* @param certSignReq - Certificate signing request.
* @return String - SCM signed pem encoded certificate.
*/
@Override
public String getOMCertificate(OzoneManagerDetailsProto omDetails,
String certSignReq) throws IOException {
LOGGER.info("Processing CSR for om {}, UUID: {}", omDetails.getHostName(),
omDetails.getUuid());
Objects.requireNonNull(omDetails);
Future<X509CertificateHolder> future =
certificateServer.requestCertificate(certSignReq,
KERBEROS_TRUSTED);
try {
return CertificateCodec.getPEMEncodedString(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("getOMCertificate operation failed. ", e);
} catch (ExecutionException e) {
throw new IOException("getOMCertificate operation failed. ", e);
}
}
/**
* Get SCM signed certificate with given serial id.
*
* @param certSerialId - Certificate serial id.
* @return string - pem encoded SCM signed certificate.
*/
@Override
public String getCertificate(String certSerialId) throws IOException {
LOGGER.debug("Getting certificate with certificate serial id {}",
certSerialId);
try {
X509Certificate certificate =
certificateServer.getCertificate(certSerialId);
if (certificate != null) {
return CertificateCodec.getPEMEncodedString(certificate);
}
} catch (CertificateException e) {
throw new IOException("getCertificate operation failed. ", e);
}
LOGGER.debug("Certificate with serial id {} not found.", certSerialId);
throw new IOException("Certificate not found");
}
/**
* Get SCM signed certificate for OM.
*
* @return string - Root certificate.
*/
@Override
public String getCACertificate() throws IOException {
LOGGER.debug("Getting CA certificate.");
try {
return CertificateCodec.getPEMEncodedString(
certificateServer.getCACertificate());
} catch (CertificateException e) {
throw new IOException("getRootCertificate operation failed. ", e);
}
}
/**
*
* @param role - node role: OM/SCM/DN.
* @param startSerialId - start certificate serial id.
* @param count - max number of certificates returned in a batch.
* @param isRevoked - whether list for revoked certs only.
* @return
* @throws IOException
*/
@Override
public List<String> listCertificate(HddsProtos.NodeType role,
long startSerialId, int count, boolean isRevoked) throws IOException {
List<X509Certificate> certificates =
certificateServer.listCertificate(role, startSerialId, count,
isRevoked);
List<String> results = new ArrayList<>(certificates.size());
for (X509Certificate cert : certificates) {
try {
String certStr = CertificateCodec.getPEMEncodedString(cert);
results.add(certStr);
} catch (SCMSecurityException e) {
throw new IOException("listCertificate operation failed. ", e);
}
}
return results;
}
public RPC.Server getRpcServer() {
return rpcServer;
}
public InetSocketAddress getRpcAddress() {
return rpcAddress;
}
public void start() {
String startupMsg = StorageContainerManager.buildRpcServerStartMessage(
"Starting RPC server for SCMSecurityProtocolServer.", getRpcAddress());
LOGGER.info(startupMsg);
metrics.register();
getRpcServer().start();
}
public void stop() {
try {
LOGGER.info("Stopping the SCMSecurityProtocolServer.");
metrics.unregister();
getRpcServer().stop();
} catch (Exception ex) {
LOGGER.error("SCMSecurityProtocolServer stop failed.", ex);
}
}
public void join() throws InterruptedException {
LOGGER.trace("Join RPC server for SCMSecurityProtocolServer.");
getRpcServer().join();
}
}