Merge remote-tracking branch 'origin/master' into HDDS-4440-s3-performance
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 55b518d..d45f68f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -228,6 +228,26 @@
}
/**
+ * Retrieve a number, trying the supplied config keys in order.
+ * Each config value may be absent
+ *
+ * @param conf Conf
+ * @param keys a list of configuration key names.
+ *
+ * @return first number found from the given keys, or absent.
+ */
+ public static OptionalInt getNumberFromConfigKeys(
+ ConfigurationSource conf, String... keys) {
+ for (final String key : keys) {
+ final String value = conf.getTrimmed(key);
+ if (value != null) {
+ return OptionalInt.of(Integer.parseInt(value));
+ }
+ }
+ return OptionalInt.empty();
+ }
+
+ /**
* Retrieve the port number, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index d673586..709cf45 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3087,7 +3087,6 @@
<description>The S3Gateway service principal.
Ex: s3g/_HOST@REALM.COM</description>
</property>
-
<property>
<name>hdds.container.checksum.verification.enabled</name>
<value>true</value>
@@ -3105,7 +3104,14 @@
OM/SCM/DN/S3GATEWAY Server connection timeout in milliseconds.
</description>
</property>
-
+ <property>
+ <name>ozone.om.grpc.maximum.response.length</name>
+ <value>134217728</value>
+ <tag>OZONE, OM, S3GATEWAY</tag>
+ <description>
+ OM/S3GATEWAY OMRequest, OMResponse over grpc max message length (bytes).
+ </description>
+ </property>
<property>
<name>ozone.default.bucket.layout</name>
<value/>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
index fd8aa28..6700101 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
@@ -36,6 +36,8 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
+
import static org.hamcrest.core.Is.is;
import org.junit.Assert;
import static org.junit.Assert.assertThat;
@@ -216,4 +218,39 @@
}
-}
\ No newline at end of file
+ @Test
+ public void testGetNumberFromConfigKeys() {
+ final String testnum1 = "8";
+ final String testnum2 = "7";
+ final String serviceId = "id1";
+ final String nodeId = "scm1";
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
+ testnum1);
+ Assert.assertTrue(Integer.parseInt(testnum1) ==
+ HddsUtils.getNumberFromConfigKeys(
+ conf,
+ OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+ /* Test to return first unempty key number from list */
+ /* first key is absent */
+ Assert.assertTrue(Integer.parseInt(testnum1) ==
+ HddsUtils.getNumberFromConfigKeys(
+ conf,
+ ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ serviceId, nodeId),
+ OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+ /* now set the empty key and ensure returned value from this key */
+ conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ serviceId, nodeId),
+ testnum2);
+ Assert.assertTrue(Integer.parseInt(testnum2) ==
+ HddsUtils.getNumberFromConfigKeys(
+ conf,
+ ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+ serviceId, nodeId),
+ OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+ }
+}
diff --git a/hadoop-hdds/interface-client/pom.xml b/hadoop-hdds/interface-client/pom.xml
index eed1807..8086643 100644
--- a/hadoop-hdds/interface-client/pom.xml
+++ b/hadoop-hdds/interface-client/pom.xml
@@ -86,7 +86,7 @@
<clearOutputDirectory>false</clearOutputDirectory>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
- io.grpc:protoc-gen-grpc-java:${grpc-compile.version}:exe:${os.detected.classifier}
+ io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
</execution>
diff --git a/hadoop-hdds/interface-server/pom.xml b/hadoop-hdds/interface-server/pom.xml
index e8c96b8..e79e8c7 100644
--- a/hadoop-hdds/interface-server/pom.xml
+++ b/hadoop-hdds/interface-server/pom.xml
@@ -74,7 +74,7 @@
<clearOutputDirectory>false</clearOutputDirectory>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
- io.grpc:protoc-gen-grpc-java:${grpc-compile.version}:exe:${os.detected.classifier}
+ io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
</execution>
diff --git a/hadoop-ozone/common/pom.xml b/hadoop-ozone/common/pom.xml
index 9108137..701e6d5 100644
--- a/hadoop-ozone/common/pom.xml
+++ b/hadoop-ozone/common/pom.xml
@@ -31,6 +31,56 @@
<dependencies>
<dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ <version>${io.grpc.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <version>${tcnative.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative</artifactId>
+ <version>${tcnative.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c8af3e6..bc1ed8d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -59,7 +59,8 @@
public static final String OZONE_OM_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final int OZONE_OM_PORT_DEFAULT = 9862;
-
+ public static final String OZONE_OM_GRPC_PORT_KEY =
+ "ozone.om.grpc.port";
public static final String OZONE_OM_HTTP_ENABLED_KEY =
"ozone.om.http.enabled";
public static final String OZONE_OM_HTTP_BIND_HOST_KEY =
@@ -310,6 +311,16 @@
"ozone.path.deleting.limit.per.task";
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
+ public static final String OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH =
+ "ozone.om.grpc.maximum.response.length";
+ /** Default value for GRPC_MAXIMUM_RESPONSE_LENGTH. */
+ public static final int OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
+ 128 * 1024 * 1024;
+
+ public static final String OZONE_OM_S3_GPRC_SERVER_ENABLED =
+ "ozone.om.s3.grpc.server_enabled";
+ public static final boolean OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT =
+ false;
/**
* Configuration properties for OMAdminProtcol service.
*/
@@ -320,7 +331,11 @@
"ozone.om.admin.protocol.wait.between.retries";
public static final long OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
= 1000;
-
+ public static final String OZONE_OM_TRANSPORT_CLASS =
+ "ozone.om.transport.class";
+ public static final String OZONE_OM_TRANSPORT_CLASS_DEFAULT =
+ "org.apache.hadoop.ozone.om.protocolPB"
+ + ".Hadoop3OmTransportFactory";
public static final String OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT =
"ozone.om.unflushed.transaction.max.count";
public static final int OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
new file mode 100644
index 0000000..498f935
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+
+/**
+ * The Grpc s3gateway om transport failover proxy provider implementation
+ * extending the ozone client OM failover proxy provider. This implmentation
+ * allows the Grpc OMTransport reuse OM failover retry policies and
+ * getRetryAction methods. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class GrpcOMFailoverProxyProvider<T> extends
+ OMFailoverProxyProvider<T> {
+
+ private Map<String, String> omAddresses;
+
+ public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
+ UserGroupInformation ugi,
+ String omServiceId,
+ Class<T> protocol) throws IOException {
+ super(configuration, ugi, omServiceId, protocol);
+ }
+
+ @Override
+ protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
+ throws IOException {
+ // to be used for base class omProxies,
+ // ProxyInfo not applicable for gRPC, just need key set
+ Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
+ // to be used for base class omProxyInfos
+ // OMProxyInfo not applicable for gRPC, just need key set
+ Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
+ List<String> omNodeIDList = new ArrayList<>();
+ omAddresses = new HashMap<>();
+
+ Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+ String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omSvcId, nodeId);
+
+ Optional<String> hostaddr = getHostNameFromConfigKeys(config,
+ rpcAddrKey);
+
+ OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
+ ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+ omSvcId, nodeId),
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+ if (nodeId == null) {
+ nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+ }
+ omProxiesNodeIdKeyset.put(nodeId, null);
+ omProxyInfosNodeIdKeyset.put(nodeId, null);
+ if (hostaddr.isPresent()) {
+ omAddresses.put(nodeId,
+ hostaddr.get() + ":"
+ + hostport.orElse(config
+ .getObject(GrpcOmTransport
+ .GrpcOmTransportConfig.class)
+ .getPort()));
+ } else {
+ LOG.error("expected host address not defined for: {}", rpcAddrKey);
+ throw new ConfigurationException(rpcAddrKey + "is not defined");
+ }
+ omNodeIDList.add(nodeId);
+ }
+
+ if (omProxiesNodeIdKeyset.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for OM. Please configure the system with "
+ + OZONE_OM_ADDRESS_KEY);
+ }
+
+ // set base class omProxies, omProxyInfos, omNodeIDList
+
+ // omProxies needed in base class
+ // omProxies.size == number of om nodes
+ // omProxies key needs to be valid nodeid
+ // omProxyInfos keyset needed in base class
+ setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+ }
+
+ @Override
+ protected Text computeDelegationTokenService() {
+ return new Text();
+ }
+
+ // need to throw if nodeID not in omAddresses
+ public String getGrpcProxyAddress(String nodeId) throws IOException {
+ if (omAddresses.containsKey(nodeId)) {
+ return omAddresses.get(nodeId);
+ } else {
+ LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
+ nodeId);
+ throw new IOException(
+ "expected nodeId not found in omAddresses for proxyhost");
+ }
+
+ }
+
+ public List<String> getGrpcOmNodeIDList() {
+ return getOmNodeIDList();
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 001322e..985b3e7 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -149,8 +149,6 @@
rpcAddrStr);
if (omProxyInfo.getAddress() != null) {
-
-
// For a non-HA OM setup, nodeId might be null. If so, we assign it
// the default value
if (nodeId == null) {
@@ -551,14 +549,18 @@
return null;
}
- @VisibleForTesting
- protected void setProxiesForTesting(
- Map<String, ProxyInfo<T>> testOMProxies,
- Map<String, OMProxyInfo> testOMProxyInfos,
- List<String> testOMNodeIDList) {
- this.omProxies = testOMProxies;
- this.omProxyInfos = testOMProxyInfos;
- this.omNodeIDList = testOMNodeIDList;
+ protected void setProxies(
+ Map<String, ProxyInfo<T>> setOMProxies,
+ Map<String, OMProxyInfo> setOMProxyInfos,
+ List<String> setOMNodeIDList) {
+ this.omProxies = setOMProxies;
+ this.omProxyInfos = setOMProxyInfos;
+ this.omNodeIDList = setOMNodeIDList;
}
+
+ protected List<String> getOmNodeIDList() {
+ return omNodeIDList;
+ }
+
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
new file mode 100644
index 0000000..d32926a
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.net.HostAndPort;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import org.apache.hadoop.ipc.RemoteException;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContextBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+
+/**
+ * Grpc transport for grpc between s3g and om.
+ */
+public class GrpcOmTransport implements OmTransport {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcOmTransport.class);
+
+ private static final String CLIENT_NAME = "GrpcOmTransport";
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ // gRPC specific
+ private static List<X509Certificate> caCerts = null;
+
+ private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
+ private Map<String,
+ OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
+ private Map<String, ManagedChannel> channels;
+ private int lastVisited = -1;
+ private ConfigurationSource conf;
+
+ private AtomicReference<String> host;
+ private final int maxSize;
+ private SecurityConfig secConfig;
+
+ public static void setCaCerts(List<X509Certificate> x509Certificates) {
+ caCerts = x509Certificates;
+ }
+
+ private List<String> oms;
+ private RetryPolicy retryPolicy;
+ private int failoverCount = 0;
+ private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+ omFailoverProxyProvider;
+
+ public GrpcOmTransport(ConfigurationSource conf,
+ UserGroupInformation ugi, String omServiceId)
+ throws IOException {
+
+ this.channels = new HashMap<>();
+ this.clients = new HashMap<>();
+ this.conf = conf;
+ this.host = new AtomicReference();
+
+ secConfig = new SecurityConfig(conf);
+ maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+ OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+
+ omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+ conf,
+ ugi,
+ omServiceId,
+ OzoneManagerProtocolPB.class);
+
+ start();
+ }
+
+ public void start() throws IOException {
+ host.set(omFailoverProxyProvider
+ .getGrpcProxyAddress(
+ omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+
+ if (!isRunning.compareAndSet(false, true)) {
+ LOG.info("Ignore. already started.");
+ return;
+ }
+
+ List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+ for (String nodeId : nodes) {
+ String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+ HostAndPort hp = HostAndPort.fromString(hostaddr);
+
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort())
+ .usePlaintext()
+ .maxInboundMessageSize(maxSize);
+
+ if (secConfig.isGrpcTlsEnabled()) {
+ try {
+ SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+ if (secConfig.isSecurityEnabled()) {
+ if (caCerts != null) {
+ sslContextBuilder.trustManager(caCerts);
+ } else {
+ LOG.error("x509Certicates empty");
+ }
+ channelBuilder.useTransportSecurity().
+ sslContext(sslContextBuilder.build());
+ } else {
+ LOG.error("ozone.security not enabled when TLS specified," +
+ " using plaintext");
+ }
+ } catch (Exception ex) {
+ LOG.error("cannot establish TLS for grpc om transport client");
+ }
+ } else {
+ channelBuilder.usePlaintext();
+ }
+
+ channels.put(hostaddr, channelBuilder.build());
+ clients.put(hostaddr,
+ OzoneManagerServiceGrpc
+ .newBlockingStub(channels.get(hostaddr)));
+ }
+ int maxFailovers = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+
+ retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers);
+ LOG.info("{}: started", CLIENT_NAME);
+ }
+
+ @Override
+ public OMResponse submitRequest(OMRequest payload) throws IOException {
+ OMResponse resp = null;
+ boolean tryOtherHost = true;
+ ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
+ while (tryOtherHost) {
+ tryOtherHost = false;
+ try {
+ resp = clients.get(host.get()).submitRequest(payload);
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
+ resultCode = ResultCodes.TIMEOUT;
+ }
+ Exception exp = new Exception(e);
+ tryOtherHost = shouldRetry(unwrapException(exp));
+ if (!tryOtherHost) {
+ throw new OMException(resultCode);
+ }
+ }
+ }
+ return resp;
+ }
+
+ private Exception unwrapException(Exception ex) {
+ Exception grpcException = null;
+ try {
+ StatusRuntimeException srexp =
+ (StatusRuntimeException)ex.getCause();
+ Status status = srexp.getStatus();
+ LOG.debug("GRPC exception wrapped: {}", status.getDescription());
+ if (status.getCode() == Status.Code.INTERNAL) {
+ // exception potentially generated by OzoneManagerServiceGrpc
+ Class<?> realClass = Class.forName(status.getDescription()
+ .substring(0, status.getDescription()
+ .indexOf(":")));
+ Class<? extends Exception> cls = realClass
+ .asSubclass(Exception.class);
+ Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+ cn.setAccessible(true);
+ grpcException = cn.newInstance(status.getDescription());
+ IOException remote = null;
+ try {
+ String cause = status.getDescription();
+ cause = cause.substring(cause.indexOf(":") + 2);
+ remote = new RemoteException(cause.substring(0, cause.indexOf(":")),
+ cause.substring(cause.indexOf(":") + 1));
+ grpcException.initCause(remote);
+ } catch (Exception e) {
+ LOG.error("cannot get cause for remote exception");
+ }
+ } else {
+ // exception generated by connection failure, gRPC
+ grpcException = ex;
+ }
+ } catch (Exception e) {
+ grpcException = new IOException(e);
+ LOG.error("error unwrapping exception from OMResponse {}");
+ }
+ return grpcException;
+ }
+
+ private boolean shouldRetry(Exception ex) {
+ boolean retry = false;
+ RetryPolicy.RetryAction action = null;
+ try {
+ action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, true);
+ LOG.debug("grpc failover retry action {}", action.action);
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ retry = false;
+ LOG.error("Retry request failed. " + action.reason, ex);
+ } else {
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
+ (action.action == RetryPolicy.RetryAction.RetryDecision
+ .FAILOVER_AND_RETRY)) {
+ if (action.delayMillis > 0) {
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (Exception e) {
+ LOG.error("Error trying sleep thread for {}", action.delayMillis);
+ }
+ }
+ // switch om host to current proxy OMNodeId
+ omFailoverProxyProvider.performFailover(null);
+ host.set(omFailoverProxyProvider
+ .getGrpcProxyAddress(
+ omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+ retry = true;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed failover exception {}", e);
+ }
+ return retry;
+ }
+
+ // stub implementation for interface
+ @Override
+ public Text getDelegationTokenService() {
+ return new Text();
+ }
+
+ public void shutdown() {
+ for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) {
+ ManagedChannel channel = entry.getValue();
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}",
+ entry.getKey(), e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ shutdown();
+ }
+
+ /**
+ * GrpcOmTransport configuration in Java style configuration class.
+ */
+ @ConfigGroup(prefix = "ozone.om.grpc")
+ public static final class GrpcOmTransportConfig {
+ @Config(key = "port", defaultValue = "8981",
+ description = "Port used for"
+ + " the GrpcOmTransport OzoneManagerServiceGrpc server",
+ tags = {ConfigTag.MANAGEMENT})
+ private int port;
+
+ public int getPort() {
+ return port;
+ }
+
+ public GrpcOmTransportConfig setPort(int portParam) {
+ this.port = portParam;
+ return this;
+ }
+ }
+
+ @VisibleForTesting
+ public void startClient(ManagedChannel testChannel) throws IOException {
+ List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+ for (String nodeId : nodes) {
+ String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+
+ clients.put(hostaddr,
+ OzoneManagerServiceGrpc
+ .newBlockingStub(testChannel));
+ }
+ LOG.info("{}: started", CLIENT_NAME);
+ }
+
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
new file mode 100644
index 0000000..5f34a3c
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Factory to create the default GrpcOm transport.
+ */
+public class GrpcOmTransportFactory implements OmTransportFactory {
+ @Override
+ public OmTransport createOmTransport(ConfigurationSource source,
+ UserGroupInformation ugi,
+ String omServiceId) throws IOException {
+ return new GrpcOmTransport(source, ugi, omServiceId);
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java
index 1ffc861..2ba8536 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java
@@ -24,6 +24,9 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.security.UserGroupInformation;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS_DEFAULT;
+
/**
* Factory pattern to create object for RPC communication with OM.
*/
@@ -34,24 +37,31 @@
static OmTransport create(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
- OmTransportFactory factory = createFactory();
+ OmTransportFactory factory = createFactory(conf);
return factory.createOmTransport(conf, ugi, omServiceId);
}
- static OmTransportFactory createFactory() throws IOException {
- ServiceLoader<OmTransportFactory> transportFactoryServiceLoader =
- ServiceLoader.load(OmTransportFactory.class);
- Iterator<OmTransportFactory> iterator =
- transportFactoryServiceLoader.iterator();
- if (iterator.hasNext()) {
- return iterator.next();
- }
+ static OmTransportFactory createFactory(ConfigurationSource conf)
+ throws IOException {
try {
+ // if configured transport class is different than the default
+ // OmTransportFactory (Hadoop3OmTransportFactory), then
+ // check service loader for transport class and instantiate it
+ if (conf
+ .get(OZONE_OM_TRANSPORT_CLASS,
+ OZONE_OM_TRANSPORT_CLASS_DEFAULT) !=
+ OZONE_OM_TRANSPORT_CLASS_DEFAULT) {
+ ServiceLoader<OmTransportFactory> transportFactoryServiceLoader =
+ ServiceLoader.load(OmTransportFactory.class);
+ Iterator<OmTransportFactory> iterator =
+ transportFactoryServiceLoader.iterator();
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ }
return OmTransportFactory.class.getClassLoader()
- .loadClass(
- "org.apache.hadoop.ozone.om.protocolPB"
- + ".Hadoop3OmTransportFactory")
+ .loadClass(OZONE_OM_TRANSPORT_CLASS_DEFAULT)
.asSubclass(OmTransportFactory.class)
.newInstance();
} catch (Exception ex) {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 10ca155..21b31e1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -188,7 +188,7 @@
private OmTransport transport;
private ThreadLocal<S3Auth> threadLocalS3Auth
= new ThreadLocal<>();
-
+
private boolean s3AuthCheck;
public OzoneManagerProtocolClientSideTranslatorPB(OmTransport omTransport,
String clientId) {
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
new file mode 100644
index 0000000..1ca5b64
--- /dev/null
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.ManagedChannel;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import com.google.protobuf.ServiceException;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for GrpcOmTransport client.
+ */
+public class TestS3GrpcOmTransport {
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestS3GrpcOmTransport.class);
+
+ private final String leaderOMNodeId = "TestOM";
+
+ private final OMResponse omResponse = OMResponse.newBuilder()
+ .setSuccess(true)
+ .setStatus(org.apache.hadoop.ozone.protocol
+ .proto.OzoneManagerProtocolProtos.Status.OK)
+ .setLeaderOMNodeId(leaderOMNodeId)
+ .setCmdType(Type.AllocateBlock)
+ .build();
+
+ private boolean doFailover = false;
+
+ private OzoneConfiguration conf;
+
+ private String omServiceId;
+ private UserGroupInformation ugi;
+ private ManagedChannel channel;
+
+
+ private ServiceException createNotLeaderException() {
+ RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
+
+ // TODO: Set suggest leaderID. Right now, client is not using suggest
+ // leaderID. Need to fix this.
+ OMNotLeaderException notLeaderException =
+ new OMNotLeaderException(raftPeerId);
+ LOG.debug(notLeaderException.getMessage());
+ return new ServiceException(notLeaderException);
+ }
+
+ private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
+ serviceImpl =
+ mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
+ delegatesTo(
+ new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() {
+ @Override
+ public void submitRequest(org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos
+ .OMRequest request,
+ io.grpc.stub.StreamObserver<org.apache
+ .hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos
+ .OMResponse>
+ responseObserver) {
+ try {
+ if (doFailover) {
+ doFailover = false;
+ throw createNotLeaderException();
+ } else {
+ responseObserver.onNext(omResponse);
+ responseObserver.onCompleted();
+ }
+ } catch (Throwable e) {
+ IOException ex = new IOException(e.getCause());
+ responseObserver.onError(io.grpc.Status
+ .INTERNAL
+ .withDescription(ex.getMessage())
+ .asRuntimeException());
+ }
+ }
+ }));
+
+ private GrpcOmTransport client;
+
+ @Before
+ public void setUp() throws Exception {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start,
+ // and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder
+ .forName(serverName)
+ .directExecutor()
+ .addService(serviceImpl)
+ .build()
+ .start());
+
+ // Create a client channel and register for automatic graceful shutdown.
+ channel = grpcCleanup.register(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+ omServiceId = "";
+ conf = new OzoneConfiguration();
+ ugi = UserGroupInformation.getCurrentUser();
+ doFailover = false;
+ }
+
+ @Test
+ public void testSubmitRequestToServer() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+
+ final OMResponse resp = client.submitRequest(omRequest);
+ Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+ .proto.OzoneManagerProtocolProtos.Status.OK);
+ Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
+ }
+
+ @Test
+ public void testGrpcFailoverProxy() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+
+ doFailover = true;
+ // first invocation generates a NotALeaderException
+ // failover is performed and request is internally retried
+ // second invocation request to server succeeds
+ final OMResponse resp = client.submitRequest(omRequest);
+ Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+ .proto.OzoneManagerProtocolProtos.Status.OK);
+ Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
+ }
+
+ @Test
+ public void testGrpcFailoverProxyExhaustRetry() throws Exception {
+ ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.ServiceList)
+ .setVersion(CURRENT_VERSION)
+ .setClientId("test")
+ .setServiceListRequest(req)
+ .build();
+
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0);
+ client = new GrpcOmTransport(conf, ugi, omServiceId);
+ client.startClient(channel);
+
+ doFailover = true;
+ // first invocation generates a NotALeaderException
+ // failover is performed and request is internally retried
+ // OMFailoverProvider returns Fail retry due to #attempts >
+ // max failovers
+
+ try {
+ final OMResponse resp = client.submitRequest(omRequest);
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ }
+}
diff --git a/hadoop-ozone/common/src/test/resources/log4j.properties b/hadoop-ozone/common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..b8ad21d
--- /dev/null
+++ b/hadoop-ozone/common/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
diff --git a/hadoop-ozone/csi/pom.xml b/hadoop-ozone/csi/pom.xml
index 67dd38f..bb8fa56 100644
--- a/hadoop-ozone/csi/pom.xml
+++ b/hadoop-ozone/csi/pom.xml
@@ -71,7 +71,7 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
- <version>${grpc-compile.version}</version>
+ <version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@@ -79,6 +79,14 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
</dependency>
<dependency>
@@ -92,7 +100,7 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
- <version>${grpc-compile.version}</version>
+ <version>${io.grpc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
@@ -103,7 +111,7 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
- <version>${grpc-compile.version}</version>
+ <version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
@@ -171,7 +179,7 @@
<configuration>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
- io.grpc:protoc-gen-grpc-java:${grpc-compile.version}:exe:${os.detected.classifier}
+ io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
</execution>
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
index fa38aad..c22505f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
@@ -38,6 +38,7 @@
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_ozone.scm.primordial.node.id=scm1
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
OZONE-SITE.XML_ozone.recon.address=recon:9891
OZONE-SITE.XML_ozone.recon.http-address=0.0.0.0:9888
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
index 69f4e52..4642680 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
@@ -36,6 +36,7 @@
OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
HDFS-SITE.XML_rpc.metrics.quantile.enable=true
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
ASYNC_PROFILER_HOME=/opt/profiler
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 2733e07..f63a784 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -39,6 +39,7 @@
OZONE-SITE.XML_ozone.datanode.pipeline.limit=1
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
OZONE-SITE.XML_hdds.heartbeat.interval=5s
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 4baaca5..014583c 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -51,6 +51,7 @@
OZONE-SITE.XML_ozone.replication=3
OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m
OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index ad445a0..f3c70af 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -72,6 +72,7 @@
OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
HDFS-SITE.XML_dfs.datanode.kerberos.principal=dn/dn@EXAMPLE.COM
HDFS-SITE.XML_dfs.datanode.kerberos.keytab.file=/etc/security/keytabs/dn.keytab
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index 3439317..20281ea 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -309,6 +309,8 @@
io.netty:netty-handler
io.netty:netty-handler-proxy
io.netty:netty-resolver
+ io.netty:netty-tcnative-boringssl-static
+ io.netty:netty-tcnative
io.netty:netty-transport
io.netty:netty-transport-native-epoll
io.netty:netty-transport-native-unix-common
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 3911c4d..3cb582b 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -173,6 +173,8 @@
share/ozone/lib/netty-handler.Final.jar
share/ozone/lib/netty-handler-proxy.Final.jar
share/ozone/lib/netty-resolver.Final.jar
+share/ozone/lib/netty-tcnative-boringssl-static.Final.jar
+share/ozone/lib/netty-tcnative.Final.jar
share/ozone/lib/netty-transport.Final.jar
share/ozone/lib/netty-transport-native-epoll.Final.jar
share/ozone/lib/netty-transport-native-unix-common.Final.jar
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index b239621..3b9d3df 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -95,6 +95,7 @@
ScmConfigKeys.OZONE_SCM_ADDRESS_KEY,
OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
OMConfigKeys.OZONE_FS_TRASH_CHECKPOINT_INTERVAL_KEY,
+ OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED,
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY,
@@ -114,7 +115,9 @@
ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM,
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
- OMConfigKeys.OZONE_OM_HA_PREFIX
+ OMConfigKeys.OZONE_OM_HA_PREFIX,
+ OMConfigKeys.OZONE_OM_TRANSPORT_CLASS,
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY
// TODO HDDS-2856
));
}
diff --git a/hadoop-ozone/interface-client/pom.xml b/hadoop-ozone/interface-client/pom.xml
index 292571b..276ffdc 100644
--- a/hadoop-ozone/interface-client/pom.xml
+++ b/hadoop-ozone/interface-client/pom.xml
@@ -41,6 +41,24 @@
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-interface-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -55,16 +73,24 @@
<extensions>true</extensions>
<executions>
<execution>
- <id>compile-protoc</id>
+ <id>compile-protoc-OmGrpc</id>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
+ <goal>compile-custom</goal>
+ <goal>test-compile-custom</goal>
</goals>
<configuration>
- <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
<protocArtifact>
com.google.protobuf:protoc:${proto2.hadooprpc.protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
+ <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
+ <outputDirectory>target/generated-sources/protobuf/java</outputDirectory>
+ <clearOutputDirectory>false</clearOutputDirectory>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>
+ io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
</configuration>
</execution>
<execution>
diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml
index 5e42414..6291fb8 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -87,6 +87,17 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative</artifactId>
+ <version>${tcnative.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <version>${tcnative.version}</version>
+ <scope>runtime</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
new file mode 100644
index 0000000..2231c28
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.OptionalInt;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.grpc.Server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_GRPC_TLS_PROVIDER_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+
+/**
+ * Separated network server for gRPC transport OzoneManagerService s3g->OM.
+ */
+public class GrpcOzoneManagerServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcOzoneManagerServer.class);
+
+ private Server server;
+ private int port = 8981;
+ private final int maxSize;
+
+ public GrpcOzoneManagerServer(OzoneConfiguration config,
+ OzoneManagerProtocolServerSideTranslatorPB
+ omTranslator,
+ OzoneDelegationTokenSecretManager
+ delegationTokenMgr,
+ CertificateClient caClient) {
+ maxSize = config.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+ OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+ OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config,
+ ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+ config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY),
+ config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)),
+ OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+ if (haPort.isPresent()) {
+ this.port = haPort.getAsInt();
+ } else {
+ this.port = config.getObject(
+ GrpcOmTransport.GrpcOmTransportConfig.class).
+ getPort();
+ }
+
+ init(omTranslator,
+ delegationTokenMgr,
+ config,
+ caClient);
+ }
+
+ public void init(OzoneManagerProtocolServerSideTranslatorPB omTranslator,
+ OzoneDelegationTokenSecretManager delegationTokenMgr,
+ OzoneConfiguration omServerConfig,
+ CertificateClient caClient) {
+ NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
+ .maxInboundMessageSize(maxSize)
+ .addService(new OzoneManagerServiceGrpc(omTranslator,
+ delegationTokenMgr,
+ omServerConfig));
+
+ SecurityConfig secConf = new SecurityConfig(omServerConfig);
+ if (secConf.isGrpcTlsEnabled()) {
+ try {
+ if (secConf.isSecurityEnabled()) {
+ SslContextBuilder sslClientContextBuilder =
+ SslContextBuilder.forServer(caClient.getPrivateKey(),
+ caClient.getCertificate());
+ SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
+ sslClientContextBuilder,
+ SslProvider.valueOf(omServerConfig.get(HDDS_GRPC_TLS_PROVIDER,
+ HDDS_GRPC_TLS_PROVIDER_DEFAULT)));
+ nettyServerBuilder.sslContext(sslContextBuilder.build());
+ } else {
+ LOG.error("ozone.security not enabled when TLS specified," +
+ " creating Om S3g GRPC channel using plaintext");
+ }
+ } catch (Exception ex) {
+ LOG.error("Unable to setup TLS for secure Om S3g GRPC channel.", ex);
+ }
+ }
+
+ server = nettyServerBuilder.build();
+ }
+
+ public void start() throws IOException {
+ server.start();
+ LOG.info("{} is started using port {}", getClass().getSimpleName(),
+ server.getPort());
+ port = server.getPort();
+ }
+
+ public void stop() {
+ try {
+ server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
+ LOG.info("Server {} is shutdown", getClass().getSimpleName());
+ } catch (InterruptedException ex) {
+ LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
+ }
+ }
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b733d89..c0bd8cc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -236,6 +236,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
@@ -296,6 +298,7 @@
private final Text omRpcAddressTxt;
private OzoneConfiguration configuration;
private RPC.Server omRpcServer;
+ private GrpcOzoneManagerServer omS3gGrpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
@@ -334,7 +337,9 @@
private JvmPauseMonitor jvmPauseMonitor;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
+ private final boolean isOmGrpcServerEnabled;
private volatile boolean isOmRpcServerRunning = false;
+ private volatile boolean isOmGrpcServerRunning = false;
private String omComponent;
private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
@@ -456,6 +461,9 @@
this.isSpnegoEnabled = conf.get(OZONE_OM_HTTP_AUTH_TYPE, "simple")
.equals("kerberos");
+ this.isOmGrpcServerEnabled = conf.getBoolean(
+ OZONE_OM_S3_GPRC_SERVER_ENABLED,
+ OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT);
this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
this.preallocateBlocksMax = conf.getInt(
@@ -559,6 +567,10 @@
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+ // Start S3g Om gRPC Server.
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+ }
shutdownHook = () -> {
saveOmMetrics();
};
@@ -1086,6 +1098,21 @@
return rpcServer;
}
+ /**
+ * Starts an s3g OmGrpc server.
+ *
+ * @param conf configuration
+ * @return gRPC server
+ * @throws IOException if there is an I/O error while creating RPC server
+ */
+ private GrpcOzoneManagerServer startGrpcServer(OzoneConfiguration conf)
+ throws IOException {
+ return new GrpcOzoneManagerServer(conf,
+ this.omServerProtocol,
+ this.delegationTokenMgr,
+ this.certClient);
+ }
+
private static boolean isOzoneSecurityEnabled() {
return securityEnabled;
}
@@ -1431,7 +1458,10 @@
isOmRpcServerRunning = true;
startTrashEmptier(configuration);
-
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer.start();
+ isOmGrpcServerRunning = true;
+ }
registerMXBean();
startJVMPauseMonitor();
@@ -1492,7 +1522,9 @@
}
omRpcServer = getRpcServer(configuration);
-
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+ }
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -1506,6 +1538,10 @@
startTrashEmptier(configuration);
registerMXBean();
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer.start();
+ isOmGrpcServerRunning = true;
+ }
startJVMPauseMonitor();
setStartTime();
omState = State.RUNNING;
@@ -1816,6 +1852,19 @@
}
/**
+ * Creates a new instance of gRPC OzoneManagerServiceGrpc transport server
+ * for serving s3g OmRequests. If an earlier instance is already running
+ * then returns the same.
+ */
+ private GrpcOzoneManagerServer getOmS3gGrpcServer(OzoneConfiguration conf)
+ throws IOException {
+ if (isOmGrpcServerRunning) {
+ return omS3gGrpcServer;
+ }
+ return startGrpcServer(configuration);
+ }
+
+ /**
* Creates an instance of ratis server.
*/
/**
@@ -1908,6 +1957,9 @@
scheduleOMMetricsWriteTask = null;
}
omRpcServer.stop();
+ if (isOmGrpcServerEnabled) {
+ omS3gGrpcServer.stop();
+ }
// When ratis is not enabled, we need to call stop() to stop
// OzoneManageDoubleBuffer in OM server protocol.
if (!isRatisEnabled) {
@@ -1918,6 +1970,9 @@
omRatisServer = null;
}
isOmRpcServerRunning = false;
+ if (isOmGrpcServerEnabled) {
+ isOmGrpcServerRunning = false;
+ }
keyManager.stop();
stopSecretManager();
if (httpServer != null) {
@@ -3621,7 +3676,7 @@
if (isAclEnabled) {
InetAddress remoteIp = Server.getRemoteIp();
resolved = resolveBucketLink(requested, new HashSet<>(),
- Server.getRemoteUser(),
+ getRemoteUser(),
remoteIp,
remoteIp != null ? remoteIp.getHostName() :
omRpcAddress.getHostName());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
new file mode 100644
index 0000000..a88e259
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import io.grpc.Status;
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Grpc Service for handling S3 gateway OzoneManagerProtocol client requests.
+ */
+public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OzoneManagerServiceGrpc.class);
+ /**
+ * RpcController is not used and hence is set to null.
+ */
+ private static final RpcController NULL_RPC_CONTROLLER = null;
+ private OzoneManagerProtocolServerSideTranslatorPB omTranslator;
+ private OzoneDelegationTokenSecretManager delegationTokenMgr;
+ private final SecurityConfig secConfig;
+
+ OzoneManagerServiceGrpc(
+ OzoneManagerProtocolServerSideTranslatorPB omTranslator,
+ OzoneDelegationTokenSecretManager delegationTokenMgr,
+ OzoneConfiguration configuration) {
+ this.omTranslator = omTranslator;
+ this.delegationTokenMgr = delegationTokenMgr;
+ this.secConfig = new SecurityConfig(configuration);
+ }
+
+ @Override
+ public void submitRequest(OMRequest request,
+ io.grpc.stub.StreamObserver<OMResponse>
+ responseObserver) {
+ LOG.debug("OzoneManagerServiceGrpc: OzoneManagerServiceImplBase " +
+ "processing s3g client submit request - for command {}",
+ request.getCmdType().name());
+ AtomicInteger callCount = new AtomicInteger(0);
+
+ org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
+ callCount.incrementAndGet(),
+ null,
+ null,
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ ClientId.getClientId()));
+ // TODO: currently require setting the Server class for each request
+ // with thread context (Server.Call()) that includes retries
+ // and importantly random ClientId. This is currently necessary for
+ // Om Ratis Server to create createWriteRaftClientRequest.
+ // Look to remove Server class requirement for issuing ratis transactions
+ // for OMRequests. Test through successful ratis-enabled OMRequest
+ // handling without dependency on hadoop IPC based Server.
+ try {
+ OMResponse omResponse = this.omTranslator.
+ submitRequest(NULL_RPC_CONTROLLER, request);
+ responseObserver.onNext(omResponse);
+ } catch (Throwable e) {
+ IOException ex = new IOException(e.getCause());
+ responseObserver.onError(Status
+ .INTERNAL
+ .withDescription(ex.getMessage())
+ .asRuntimeException());
+ }
+ responseObserver.onCompleted();
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 266024a..1eec75e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -141,6 +141,11 @@
userInfo.setUserName(user.getUserName());
}
+ // for gRPC s3g omRequests that contain user name
+ if (user == null && omRequest.hasUserInfo()) {
+ userInfo.setUserName(omRequest.getUserInfo().getUserName());
+ }
+
if (remoteAddress != null) {
userInfo.setHostName(remoteAddress.getHostName());
userInfo.setRemoteAddress(remoteAddress.getHostAddress()).build();
@@ -356,7 +361,6 @@
if (userGroupInformation != null) {
return userGroupInformation;
}
-
if (omRequest.hasUserInfo() &&
!StringUtils.isBlank(omRequest.getUserInfo().getUserName())) {
userGroupInformation = UserGroupInformation.createRemoteUser(
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
new file mode 100644
index 0000000..687ed21
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.mockito.Mockito;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+
+/**
+ * Tests for GrpcOzoneManagerServer.
+ */
+public class TestGrpcOzoneManagerServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestGrpcOzoneManagerServer.class);
+ private OzoneManager ozoneManager;
+ private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
+ private GrpcOzoneManagerServer server;
+
+ @Rule
+ public Timeout timeout = Timeout.seconds(30);
+
+ @Test
+ public void testStartStop() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ozoneManager = Mockito.mock(OzoneManager.class);
+ omServerProtocol = ozoneManager.getOmServerProtocol();
+
+ server = new GrpcOzoneManagerServer(conf,
+ omServerProtocol,
+ ozoneManager.getDelegationTokenMgr(),
+ ozoneManager.getCertificateClient());
+
+ try {
+ server.start();
+ } finally {
+ server.stop();
+ }
+ }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 0160166..fe7f6f4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -143,7 +143,7 @@
omProxyInfos.put(nodeId, null);
omNodeIDList.add(nodeId);
}
- setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
+ setProxies(omProxies, omProxyInfos, omNodeIDList);
}
@Override
diff --git a/hadoop-ozone/s3gateway/pom.xml b/hadoop-ozone/s3gateway/pom.xml
index ec751ed..ce1a4cf 100644
--- a/hadoop-ozone/s3gateway/pom.xml
+++ b/hadoop-ozone/s3gateway/pom.xml
@@ -98,7 +98,29 @@
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${io.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-hadoop-dependency-server</artifactId>
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientCache.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientCache.java
new file mode 100644
index 0000000..4a7c4a2
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientCache.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.s3;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PreDestroy;
+import javax.enterprise.context.ApplicationScoped;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS_DEFAULT;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY;
+
+/**
+ * Cached ozone client for s3 requests.
+ */
+@ApplicationScoped
+public final class OzoneClientCache {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OzoneClientCache.class);
+ // single, cached OzoneClient established on first connection
+ // for s3g gRPC OmTransport, OmRequest - OmResponse channel
+ private static OzoneClientCache instance;
+ private OzoneClient client;
+ private SecurityConfig secConfig;
+
+ private OzoneClientCache(OzoneConfiguration ozoneConfiguration)
+ throws IOException {
+ // Set the expected OM version if not set via config.
+ ozoneConfiguration.setIfUnset(OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY,
+ OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
+ String omServiceID = OmUtils.getOzoneManagerServiceId(ozoneConfiguration);
+ secConfig = new SecurityConfig(ozoneConfiguration);
+ client = null;
+ try {
+ if (secConfig.isGrpcTlsEnabled()) {
+ if (ozoneConfiguration
+ .get(OZONE_OM_TRANSPORT_CLASS,
+ OZONE_OM_TRANSPORT_CLASS_DEFAULT) !=
+ OZONE_OM_TRANSPORT_CLASS_DEFAULT) {
+ // Grpc transport selected
+ // need to get certificate for TLS through
+ // hadoop rpc first via ServiceInfo
+ setCertificate(omServiceID,
+ ozoneConfiguration);
+ }
+ }
+ if (omServiceID == null) {
+ client = OzoneClientFactory.getRpcClient(ozoneConfiguration);
+ } else {
+ // As in HA case, we need to pass om service ID.
+ client = OzoneClientFactory.getRpcClient(omServiceID,
+ ozoneConfiguration);
+ }
+ } catch (IOException e) {
+ LOG.warn("cannot create OzoneClient", e);
+ throw e;
+ }
+ // S3 Gateway should always set the S3 Auth.
+ ozoneConfiguration.setBoolean(S3Auth.S3_AUTH_CHECK, true);
+ }
+
+ public static OzoneClient getOzoneClientInstance(OzoneConfiguration
+ ozoneConfiguration)
+ throws IOException {
+ if (instance == null) {
+ instance = new OzoneClientCache(ozoneConfiguration);
+ }
+ return instance.client;
+ }
+
+ public static void closeClient() throws IOException {
+ if (instance != null) {
+ instance.client.close();
+ instance = null;
+ }
+ }
+
+ private void setCertificate(String omServiceID,
+ OzoneConfiguration conf)
+ throws IOException {
+
+ // create local copy of config incase exception occurs
+ // with certificate OmRequest
+ OzoneConfiguration config = new OzoneConfiguration(conf);
+ OzoneClient certClient;
+
+ if (secConfig.isGrpcTlsEnabled()) {
+ // set OmTransport to hadoop rpc to securely,
+ // get certificates with service list request
+ config.set(OZONE_OM_TRANSPORT_CLASS,
+ OZONE_OM_TRANSPORT_CLASS_DEFAULT);
+
+ if (omServiceID == null) {
+ certClient = OzoneClientFactory.getRpcClient(config);
+ } else {
+ // As in HA case, we need to pass om service ID.
+ certClient = OzoneClientFactory.getRpcClient(omServiceID,
+ config);
+ }
+ try {
+ ServiceInfoEx serviceInfoEx = certClient
+ .getObjectStore()
+ .getClientProxy()
+ .getOzoneManagerClient()
+ .getServiceInfo();
+
+ if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+ String caCertPem = null;
+ List<String> caCertPems = null;
+ caCertPem = serviceInfoEx.getCaCertificate();
+ caCertPems = serviceInfoEx.getCaCertPemList();
+ if (caCertPems == null || caCertPems.isEmpty()) {
+ if (caCertPem == null) {
+ LOG.error("S3g received empty caCertPems from serviceInfo");
+ throw new CertificateException("No caCerts found; caCertPem can" +
+ " not be null when caCertPems is empty or null");
+ }
+ caCertPems = Collections.singletonList(caCertPem);
+ }
+ GrpcOmTransport.setCaCerts(OzoneSecurityUtil
+ .convertToX509(caCertPems));
+ }
+ } catch (CertificateException ce) {
+ throw new IOException(ce);
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (certClient != null) {
+ certClient.close();
+ }
+ }
+ }
+ }
+
+
+ @PreDestroy
+ public void destroy() throws IOException {
+ OzoneClientCache.closeClient();
+ }
+}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
index d2118f5..e9ddc08 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
@@ -29,9 +29,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.om.protocol.S3Auth;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
@@ -39,12 +37,9 @@
import org.apache.hadoop.ozone.s3.signature.SignatureInfo.Version;
import org.apache.hadoop.ozone.s3.signature.SignatureProcessor;
import org.apache.hadoop.ozone.s3.signature.StringToSignProducer;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ACCESS_DENIED;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INTERNAL_ERROR;
@@ -57,7 +52,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(OzoneClientProducer.class);
- private static OzoneClient client;
+ private OzoneClient client;
@Inject
private SignatureProcessor signatureProcessor;
@@ -71,9 +66,7 @@
@Produces
public synchronized OzoneClient createClient() throws WebApplicationException,
IOException {
- if (client == null) {
- client = createOzoneClient();
- }
+ client = getClient(ozoneConfiguration);
return client;
}
@@ -112,25 +105,24 @@
}
}
- @NotNull
- @VisibleForTesting
- OzoneClient createOzoneClient() throws IOException {
- // S3 Gateway should always set the S3 Auth.
- ozoneConfiguration.setBoolean(S3Auth.S3_AUTH_CHECK, true);
- // Set the expected OM version if not set via config.
- ozoneConfiguration.setIfUnset(OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY,
- OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
- String omServiceID = OmUtils.getOzoneManagerServiceId(ozoneConfiguration);
- if (omServiceID == null) {
- return OzoneClientFactory.getRpcClient(ozoneConfiguration);
- } else {
- // As in HA case, we need to pass om service ID.
- return OzoneClientFactory.getRpcClient(omServiceID,
- ozoneConfiguration);
+ private OzoneClient getClient(OzoneConfiguration config)
+ throws IOException {
+ OzoneClient ozoneClient = null;
+ try {
+ ozoneClient =
+ OzoneClientCache.getOzoneClientInstance(ozoneConfiguration);
+ } catch (Exception e) {
+ // For any other critical errors during object creation throw Internal
+ // error.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error during Client Creation: ", e);
+ }
+ throw e;
}
+ return ozoneClient;
}
- public void setOzoneConfiguration(OzoneConfiguration config) {
+ public synchronized void setOzoneConfiguration(OzoneConfiguration config) {
this.ozoneConfiguration = config;
}
@@ -138,7 +130,7 @@
public void setSignatureParser(SignatureProcessor awsSignatureProcessor) {
this.signatureProcessor = awsSignatureProcessor;
}
-
+
private WebApplicationException wrapOS3Exception(OS3Exception os3Exception) {
return new WebApplicationException(os3Exception.getErrorMessage(),
os3Exception,
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 9a0efc7..b08bfdb 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -79,6 +79,12 @@
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
+ } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ s3Auth.getAccessID(), ex);
+ } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+ ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+ throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
} else {
throw ex;
}
@@ -110,8 +116,14 @@
if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND
|| ex.getResult() == ResultCodes.VOLUME_NOT_FOUND) {
throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
+ } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ s3Auth.getAccessID(), ex);
} else if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
+ } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+ ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+ throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
} else {
throw ex;
}
@@ -140,6 +152,12 @@
getMetrics().incCreateBucketFailure();
if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
+ } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ s3Auth.getAccessID(), ex);
+ } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+ ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+ throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
} else if (ex.getResult() != ResultCodes.BUCKET_ALREADY_EXISTS) {
// S3 does not return error for bucket already exists, it just
// returns the location.
@@ -160,9 +178,17 @@
client.getObjectStore().deleteS3Bucket(s3BucketName);
} catch (OMException ex) {
if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
- throw newError(S3ErrorTable.ACCESS_DENIED, s3BucketName, ex);
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ s3BucketName, ex);
+ } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ s3Auth.getAccessID(), ex);
+ } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+ ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+ throw newError(S3ErrorTable.INTERNAL_ERROR, s3BucketName, ex);
+ } else {
+ throw ex;
}
- throw ex;
}
}
@@ -203,7 +229,15 @@
if (e.getResult() == ResultCodes.VOLUME_NOT_FOUND) {
return Collections.emptyIterator();
} else if (e.getResult() == ResultCodes.PERMISSION_DENIED) {
- throw newError(S3ErrorTable.ACCESS_DENIED, "listBuckets", e);
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ "listBuckets", e);
+ } else if (e.getResult() == ResultCodes.INVALID_TOKEN) {
+ throw newError(S3ErrorTable.ACCESS_DENIED,
+ s3Auth.getAccessID(), e);
+ } else if (e.getResult() == ResultCodes.TIMEOUT ||
+ e.getResult() == ResultCodes.INTERNAL_ERROR) {
+ throw newError(S3ErrorTable.INTERNAL_ERROR,
+ "listBuckets", e);
} else {
throw e;
}
diff --git a/hadoop-ozone/s3gateway/src/main/resources/META-INF/services/org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory b/hadoop-ozone/s3gateway/src/main/resources/META-INF/services/org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory
new file mode 100644
index 0000000..254933b
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/resources/META-INF/services/org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransportFactory
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
new file mode 100644
index 0000000..304a2bf
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
+
+/**
+ * Tests for GrpcOmTransport.
+ */
+public class TestGrpcOmTransport {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestGrpcOmTransport.class);
+ @Rule
+ public Timeout timeout = Timeout.seconds(30);
+
+
+ @Test
+ public void testGrpcOmTransportFactory() throws Exception {
+ String omServiceId = "";
+ String transportCls = GrpcOmTransport.class.getName();
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(OZONE_OM_TRANSPORT_CLASS,
+ transportCls);
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
+ Assert.assertEquals(GrpcOmTransport.class.getSimpleName(),
+ omTransport.getClass().getSimpleName());
+
+ }
+
+ @Test
+ public void testHrpcOmTransportFactory() throws Exception {
+ String omServiceId = "";
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
+ // OmTransport should be Hadoop Rpc and
+ // fail equality GrpcOmTransport equality test
+ Assert.assertNotEquals(GrpcOmTransport.class.getSimpleName(),
+ omTransport.getClass().getSimpleName());
+ }
+
+ @Test
+ public void testStartStop() throws Exception {
+ String omServiceId = "";
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ GrpcOmTransport client = new GrpcOmTransport(conf, ugi, omServiceId);
+
+ try {
+ client.start();
+ } finally {
+ client.shutdown();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index fd0578c..e69803f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,10 +196,13 @@
<!-- Maven protoc compiler -->
<protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
<grpc.protobuf-compile.version>3.12.0</grpc.protobuf-compile.version>
- <grpc-compile.version>1.33.0</grpc-compile.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<netty.version>4.1.63.Final</netty.version>
+ <io.grpc.version>1.38.0</io.grpc.version>
+ <tcnative.version>2.0.38.Final</tcnative.version> <!-- See table for correct version -->
+ <!-- Table for netty, grpc & tcnative version combinations -->
+ <!-- https://github.com/grpc/grpc-java/blob/master/SECURITY.md#netty -->
<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>