Merge master to EC branch HDDS-3816-ec
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 0ebf088..5f8f6f5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -26,6 +26,10 @@
"hdds.heartbeat.interval";
public static final String HDDS_HEARTBEAT_INTERVAL_DEFAULT =
"30s";
+ public static final String HDDS_RECON_HEARTBEAT_INTERVAL =
+ "hdds.recon.heartbeat.interval";
+ public static final String HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT =
+ "60s";
public static final String HDDS_NODE_REPORT_INTERVAL =
"hdds.node.report.interval";
public static final String HDDS_NODE_REPORT_INTERVAL_DEFAULT =
@@ -269,4 +273,7 @@
"hdds.container.checksum.verification.enabled";
public static final boolean
HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT = true;
+
+ public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_DNAUDIT =
+ "ozone.audit.log.debug.cmd.list.dnaudit";
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 66fb593..2a962d7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -552,6 +552,8 @@
public static final boolean
OZONE_SCM_HA_RATIS_SERVER_ELECTION_PRE_VOTE_DEFAULT = false;
+ public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_SCMAUDIT =
+ "ozone.audit.log.debug.cmd.list.scmaudit";
/**
* Never constructed.
*/
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 4b7fda9..699d732 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -467,6 +467,8 @@
OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS =
300 * 1000;
+ public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_OMAUDIT =
+ "ozone.audit.log.debug.cmd.list.omaudit";
/**
* There is no need to instantiate this class.
*/
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
index 9f1f5f0..43fb4e4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
@@ -18,10 +18,20 @@
package org.apache.hadoop.ozone.audit;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.spi.ExtendedLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
/**
@@ -29,10 +39,17 @@
*/
public class AuditLogger {
+ private static final Logger LOG = LoggerFactory.getLogger(AuditLogger.class);
+
private ExtendedLogger logger;
private static final String FQCN = AuditLogger.class.getName();
private static final Marker WRITE_MARKER = AuditMarker.WRITE.getMarker();
private static final Marker READ_MARKER = AuditMarker.READ.getMarker();
+ private final AtomicReference<Set<String>> debugCmdSetRef =
+ new AtomicReference<>(new HashSet<>());
+ public static final String AUDIT_LOG_DEBUG_CMD_LIST_PREFIX =
+ "ozone.audit.log.debug.cmd.list.";
+ private AuditLoggerType type;
/**
* Parametrized Constructor to initialize logger.
@@ -48,6 +65,8 @@
*/
private void initializeLogger(AuditLoggerType loggerType) {
this.logger = LogManager.getContext(false).getLogger(loggerType.getType());
+ this.type = loggerType;
+ refreshDebugCmdSet();
}
@VisibleForTesting
@@ -56,7 +75,11 @@
}
public void logWriteSuccess(AuditMessage msg) {
- this.logger.logIfEnabled(FQCN, Level.INFO, WRITE_MARKER, msg, null);
+ if (shouldLogAtDebug(msg)) {
+ this.logger.logIfEnabled(FQCN, Level.DEBUG, WRITE_MARKER, msg, null);
+ } else {
+ this.logger.logIfEnabled(FQCN, Level.INFO, WRITE_MARKER, msg, null);
+ }
}
public void logWriteFailure(AuditMessage msg) {
@@ -65,7 +88,11 @@
}
public void logReadSuccess(AuditMessage msg) {
- this.logger.logIfEnabled(FQCN, Level.INFO, READ_MARKER, msg, null);
+ if (shouldLogAtDebug(msg)) {
+ this.logger.logIfEnabled(FQCN, Level.DEBUG, READ_MARKER, msg, null);
+ } else {
+ this.logger.logIfEnabled(FQCN, Level.INFO, READ_MARKER, msg, null);
+ }
}
public void logReadFailure(AuditMessage msg) {
@@ -75,12 +102,28 @@
public void logWrite(AuditMessage auditMessage) {
if (auditMessage.getThrowable() == null) {
- this.logger.logIfEnabled(FQCN, Level.INFO, WRITE_MARKER, auditMessage,
- auditMessage.getThrowable());
+ logWriteSuccess(auditMessage);
} else {
- this.logger.logIfEnabled(FQCN, Level.ERROR, WRITE_MARKER, auditMessage,
- auditMessage.getThrowable());
+ logWriteFailure(auditMessage);
}
}
+ public void refreshDebugCmdSet() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ refreshDebugCmdSet(conf);
+ }
+
+ public void refreshDebugCmdSet(OzoneConfiguration conf) {
+ Collection<String> cmds = conf.getTrimmedStringCollection(
+ AUDIT_LOG_DEBUG_CMD_LIST_PREFIX +
+ type.getType().toLowerCase(Locale.ROOT));
+ LOG.info("Refresh DebugCmdSet for {} to {}.", type.getType(), cmds);
+ debugCmdSetRef.set(
+ cmds.stream().map(String::toLowerCase).collect(Collectors.toSet()));
+ }
+
+ private boolean shouldLogAtDebug(AuditMessage auditMessage) {
+ return debugCmdSetRef.get()
+ .contains(auditMessage.getOp().toLowerCase(Locale.ROOT));
+ }
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java
index dbfde9f..d37d221 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java
@@ -23,7 +23,8 @@
public enum AuditLoggerType {
DNLOGGER("DNAudit"),
OMLOGGER("OMAudit"),
- SCMLOGGER("SCMAudit");
+ SCMLOGGER("SCMAudit"),
+ S3GLOGGER("S3GAudit");
private String type;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
index 9d28c9f..85fa798 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
@@ -26,10 +26,21 @@
public final class AuditMessage implements Message {
private final String message;
+ private final String user;
+ private final String ip;
+ private final String op;
+ private final Map<String, String> params;
+ private final String ret;
private final Throwable throwable;
- private AuditMessage(String message, Throwable throwable) {
- this.message = message;
+ private AuditMessage(String user, String ip, String op,
+ Map<String, String> params, String ret, Throwable throwable) {
+ this.user = user;
+ this.ip = ip;
+ this.op = op;
+ this.params = params;
+ this.ret = ret;
+ this.message = formMessage(user, ip, op, params, ret);
this.throwable = throwable;
}
@@ -53,6 +64,10 @@
return throwable;
}
+ public String getOp() {
+ return op;
+ }
+
/**
* Builder class for AuditMessage.
*/
@@ -95,9 +110,14 @@
}
public AuditMessage build() {
- String message = "user=" + this.user + " | ip=" + this.ip + " | " +
- "op=" + this.op + " " + this.params + " | " + "ret=" + this.ret;
- return new AuditMessage(message, throwable);
+ return new AuditMessage(user, ip, op, params, ret, throwable);
}
}
+
+ private String formMessage(String userStr, String ipStr, String opStr,
+ Map<String, String> paramsMap, String retStr) {
+ return "user=" + userStr + " | ip=" + ipStr + " | " + "op=" + opStr
+ + " " + paramsMap + " | " + "ret=" + retStr;
+
+ }
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
new file mode 100644
index 0000000..e0cbea9
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.audit;
+
+/**
+ * Enum to define Audit Action types for S3Gateway.
+ */
+public enum S3GAction implements AuditAction {
+
+ //BucketEndpoint
+ GET_BUCKET,
+ CREATE_BUCKET,
+ HEAD_BUCKET,
+ DELETE_BUCKET,
+ GET_ACL,
+ PUT_ACL,
+ LIST_MULTIPART_UPLOAD,
+ MULTI_DELETE,
+
+ //RootEndpoint
+ LIST_S3_BUCKETS,
+
+ //ObjectEndpoint
+ CREATE_MULTIPART_KEY,
+ COPY_OBJECT,
+ CREATE_KEY,
+ LIST_PARTS,
+ GET_KEY,
+ HEAD_KEY,
+ INIT_MULTIPART_UPLOAD,
+ COMPLETE_MULTIPART_UPLOAD,
+ ABORT_MULTIPART_UPLOAD,
+ DELETE_KEY;
+
+ @Override
+ public String getAction() {
+ return this.toString();
+ }
+
+}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 133528c..aaa7859 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -991,6 +991,14 @@
</description>
</property>
<property>
+ <name>hdds.recon.heartbeat.interval</name>
+ <value>60s</value>
+ <tag>OZONE, MANAGEMENT, RECON</tag>
+ <description>
+ The heartbeat interval from a Datanode to Recon.
+ </description>
+ </property>
+ <property>
<name>ozone.scm.heartbeat.log.warn.interval.count</name>
<value>10</value>
<tag>OZONE, MANAGEMENT</tag>
@@ -1651,6 +1659,24 @@
</property>
<property>
+ <name>ozone.om.unflushed.transaction.max.count</name>
+ <value>10000</value>
+ <tag>OZONE, OM</tag>
+ <description>the unflushed transactions here are those requests that have been
+ applied to OM state machine but not been flushed to OM rocksdb. when OM meets
+ high concurrency-pressure and flushing is not fast enough, too many pending
+ requests will be hold in memory and will lead to long GC of OM, which will slow
+ down flushing further. there are some cases that flushing is slow, for example,
+ 1 rocksdb is on a HDD, which has poor IO performance than SSD.
+ 2 a big compaction is happening internally in rocksdb and write stall of
+ rocksdb happens.
+ 3 long GC, which may caused by other factors.
+ the property is to limit the max count of unflushed transactions, so that the
+ maximum memory occupied by unflushed transactions is limited.
+ </description>
+ </property>
+
+ <property>
<name>ozone.om.lock.fair</name>
<value>false</value>
<description>If this is true, the Ozone Manager lock will be used in Fair
@@ -3103,4 +3129,34 @@
default the number of retries are 10.
</description>
</property>
+
+ <property>
+ <name>ozone.audit.log.debug.cmd.list.omaudit</name>
+ <value></value>
+ <tag>OM</tag>
+ <description>
+ A comma separated list of OzoneManager commands that are written to the OzoneManager audit logs only if the audit
+ log level is debug. Ex: "ALLOCATE_BLOCK,ALLOCATE_KEY,COMMIT_KEY".
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.audit.log.debug.cmd.list.scmaudit</name>
+ <value></value>
+ <tag>SCM</tag>
+ <description>
+ A comma separated list of SCM commands that are written to the SCM audit logs only if the audit
+ log level is debug. Ex: "GET_VERSION,REGISTER,SEND_HEARTBEAT".
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.audit.log.debug.cmd.list.dnaudit</name>
+ <value></value>
+ <tag>DN</tag>
+ <description>
+ A comma separated list of Datanode commands that are written to the DN audit logs only if the audit
+ log level is debug. Ex: "CREATE_CONTAINER,READ_CONTAINER,UPDATE_CONTAINER".
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java
index 01fceae..1b8e744 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.ozone.audit;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +29,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
@@ -110,11 +113,16 @@
}
}
+ @Before
+ public void init() {
+ AUDIT.refreshDebugCmdSet();
+ }
+
/**
- * Test to verify default log level is INFO when logging success events.
+ * Test to verify default log level is INFO when logging WRITE success events.
*/
@Test
- public void verifyDefaultLogLevelForSuccess() throws IOException {
+ public void verifyDefaultLogLevelForWriteSuccess() throws IOException {
AUDIT.logWriteSuccess(WRITE_SUCCESS_MSG);
String expected =
"INFO | OMAudit | ? | " + WRITE_SUCCESS_MSG.getFormattedMessage();
@@ -122,16 +130,39 @@
}
/**
- * Test to verify default log level is ERROR when logging failure events.
+ * Test to verify default log level is ERROR when logging WRITE failure
+ * events.
*/
@Test
- public void verifyDefaultLogLevelForFailure() throws IOException {
+ public void verifyDefaultLogLevelForWriteFailure() throws IOException {
AUDIT.logWriteFailure(WRITE_FAIL_MSG);
String expected =
"ERROR | OMAudit | ? | " + WRITE_FAIL_MSG.getFormattedMessage();
verifyLog(expected);
}
+ /**
+ * Test to verify default log level is INFO when logging READ success events.
+ */
+ @Test
+ public void verifyDefaultLogLevelForReadSuccess() throws IOException {
+ AUDIT.logReadSuccess(READ_SUCCESS_MSG);
+ String expected =
+ "INFO | OMAudit | ? | " + READ_SUCCESS_MSG.getFormattedMessage();
+ verifyLog(expected);
+ }
+
+ /**
+ * Test to verify default log level is ERROR when logging READ failure events.
+ */
+ @Test
+ public void verifyDefaultLogLevelForFailure() throws IOException {
+ AUDIT.logReadFailure(READ_FAIL_MSG);
+ String expected =
+ "ERROR | OMAudit | ? | " + READ_FAIL_MSG.getFormattedMessage();
+ verifyLog(expected);
+ }
+
@Test
public void messageIncludesAllParts() {
String message = WRITE_FAIL_MSG.getFormattedMessage();
@@ -143,19 +174,22 @@
}
/**
- * Test to verify no READ event is logged.
+ * Test to verify no WRITE event is logged.
*/
@Test
- public void notLogReadEvents() throws IOException {
- AUDIT.logReadSuccess(READ_SUCCESS_MSG);
- AUDIT.logReadFailure(READ_FAIL_MSG);
+ public void excludedEventNotLogged() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(AuditLogger.AUDIT_LOG_DEBUG_CMD_LIST_PREFIX +
+ AuditLoggerType.OMLOGGER.getType().toLowerCase(Locale.ROOT),
+ "CREATE_VOLUME");
+ AUDIT.refreshDebugCmdSet(conf);
+ AUDIT.logWriteSuccess(WRITE_SUCCESS_MSG);
verifyNoLog();
}
-
+
/**
* Test to verify if multiline entries can be checked.
*/
-
@Test
public void messageIncludesMultilineException() throws IOException {
String exceptionMessage = "Dummy exception message";
diff --git a/hadoop-hdds/common/src/test/resources/auditlog.properties b/hadoop-hdds/common/src/test/resources/auditlog.properties
index 85c18b5..959da04 100644
--- a/hadoop-hdds/common/src/test/resources/auditlog.properties
+++ b/hadoop-hdds/common/src/test/resources/auditlog.properties
@@ -28,7 +28,7 @@
# in the configuration
filter.read.type = MarkerFilter
filter.read.marker = READ
-filter.read.onMatch = DENY
+filter.read.onMatch = NEUTRAL
filter.read.onMismatch = NEUTRAL
# filter.write.onMatch = DENY avoids logging all WRITE events
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 7738f8d..087226c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -65,6 +65,7 @@
import com.google.protobuf.GeneratedMessage;
import static java.lang.Math.min;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getReconHeartbeatInterval;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
import org.apache.commons.collections.CollectionUtils;
@@ -149,6 +150,7 @@
*/
private final AtomicLong heartbeatFrequency = new AtomicLong(2000);
+ private final AtomicLong reconHeartbeatFrequency = new AtomicLong(2000);
/**
* Constructs a StateContext.
*
@@ -906,4 +908,15 @@
public GeneratedMessage getCRLStatusReport() {
return crlStatusReport.get();
}
+
+ public void configureReconHeartbeatFrequency() {
+ reconHeartbeatFrequency.set(getReconHeartbeatInterval(conf));
+ }
+
+ /**
+ * Return current Datanode to Recon heartbeat frequency in ms.
+ */
+ public long getReconHeartbeatFrequency() {
+ return reconHeartbeatFrequency.get();
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 29d27f3..4e5b64c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -146,9 +146,15 @@
// the thread in executor from DatanodeStateMachine for a long time,
// so that it won't affect the communication between datanode and
// other EndpointStateMachine.
+ long heartbeatFrequency;
+ if (endpoint.isPassive()) {
+ heartbeatFrequency = context.getReconHeartbeatFrequency();
+ } else {
+ heartbeatFrequency = context.getHeartbeatFrequency();
+ }
ecs.submit(() -> endpoint.getExecutorService()
.submit(endpointTask)
- .get(context.getHeartbeatFrequency(), TimeUnit.MILLISECONDS));
+ .get(heartbeatFrequency, TimeUnit.MILLISECONDS));
} else {
// This can happen if a task is taking more time than the timeOut
// specified for the task in await, and when it is completed the task
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 9951a38..47c9377 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -176,7 +176,11 @@
rpcEndPoint.getState().getNextState();
rpcEndPoint.setState(nextState);
rpcEndPoint.zeroMissedCount();
- this.stateContext.configureHeartbeatFrequency();
+ if (rpcEndPoint.isPassive()) {
+ this.stateContext.configureReconHeartbeatFrequency();
+ } else {
+ this.stateContext.configureHeartbeatFrequency();
+ }
}
} catch (IOException ex) {
rpcEndPoint.logIfNeeded(ex);
diff --git a/hadoop-hdds/docs/content/feature/OM-HA.md b/hadoop-hdds/docs/content/feature/OM-HA.md
index 573ce77..cba48e1 100644
--- a/hadoop-hdds/docs/content/feature/OM-HA.md
+++ b/hadoop-hdds/docs/content/feature/OM-HA.md
@@ -110,6 +110,43 @@
The details of this approach discussed in a separated [design doc]({{< ref "design/omha.md" >}}) but it's integral part of the OM HA design.
+## OM Bootstrap
+
+To convert a non-HA OM to be HA or to add new OM nodes to existing HA OM ring, new OM node(s) need to be bootstrapped.
+
+Before bootstrapping a new OM node, all the existing OM's on-disk configuration file (ozone-site.xml) must be updated with the configuration details
+of the new OM such nodeId, address, port etc. Note that the existing OM's need not be restarted. They will reload the configuration from disk when
+they receive a bootstrap request from the bootstrapping node.
+
+To bootstrap an OM, the following command needs to be run:
+
+```shell
+ozone om [global options (optional)] --bootstrap
+```
+
+The bootstrap command will first verify that all the OMs have the updated configuration file and fail the command otherwise. This check can be skipped
+using the _force_ option. The _force_ option allows to continue with the bootstrap when one of the existing OMs is down or not responding.
+
+```shell
+ozone om [global options (optional)] --bootstrap --force
+```
+
+Note that using the _force_ option during bootstrap could crash the OM process if it does not have updated configurations.
+
+## OM Decommission
+
+To decommission an OM and remove the node from the OM HA ring, the following steps need to be executed.
+1. Stop the OzoneManager process only on the node which needs to be decommissioned. <p> **Note -** Do not stop the decommissioning OM if there are
+ only two OMs in the ring as both the OMs would be needed to reach consensus to update the Ratis configuration.</p>
+2. Add the _OM NodeId_ of the to be decommissioned OM node to the _ozone.om.decommissioned.nodes.<omServiceId>_ property in _ozone-site.xml_ of all
+ other OMs.
+3. Run the following command to decommission an OM node.
+```shell
+ozone admin om decommission -id=<om-service-id> -nodeid=<decommissioning-om-node-id> -hostname=<decommissioning-om-node-address> [optional --force]
+```
+The _force_option will skip checking whether OM configurations in _ozone-site.xml_ have been updated with the decommissioned node added to
+_ozone.om.decommissioned.nodes_ property. <p>**Note -** It is recommended to bootstrap another OM node before decommissioning one to maintain HA.</p>
+
## References
* Check [this page]({{< ref "design/omha.md" >}}) for the links to the original design docs
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 98821d9..f64db4d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -70,6 +70,8 @@
import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
@@ -253,6 +255,18 @@
}
/**
+ * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
+ * Recon.
+ *
+ * @param conf - Ozone Config
+ * @return - HB interval in milli seconds.
+ */
+ public static long getReconHeartbeatInterval(ConfigurationSource conf) {
+ return conf.getTimeDuration(HDDS_RECON_HEARTBEAT_INTERVAL,
+ HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Get the Stale Node interval, which is used by SCM to flag a datanode as
* stale, if the heartbeat from that node has been missing for this duration.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 541ceaa..bc4748e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -59,7 +59,7 @@
/**
* Stops the HA service.
*/
- void shutdown() throws IOException;
+ void stop() throws IOException;
/**
* Adds the SC M instance to the SCM HA group.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index a69f2cb..0fcc1ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -334,10 +334,9 @@
* {@inheritDoc}
*/
@Override
- public void shutdown() throws IOException {
+ public void stop() throws IOException {
if (ratisServer != null) {
ratisServer.stop();
- ratisServer.getSCMStateMachine().close();
grpcServer.stop();
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 65f3da3..6dbbea3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -78,6 +78,11 @@
ratisServer.start();
}
+ @Override
+ public void stop() throws IOException {
+ ratisServer.stop();
+ }
+
/**
* Informs RatisServerStub to behaviour as a leader SCM or a follower SCM.
*/
@@ -112,14 +117,6 @@
return null;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void shutdown() throws IOException {
- ratisServer.stop();
- }
-
@Override
public boolean addSCM(AddSCMRequest request) throws IOException {
return false;
@@ -217,6 +214,11 @@
}
@Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
public RaftServer.Division getDivision() {
return null;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index e23c984..7a97697 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -43,6 +43,8 @@
void stop() throws IOException;
+ boolean isStopped();
+
RaftServer.Division getDivision();
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 3a68ad1..f8193fa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -76,6 +76,7 @@
private final AtomicLong callId = new AtomicLong();
private final RaftServer.Division division;
private final GrpcTlsConfig grpcTlsConfig;
+ private boolean isStopped;
// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
@@ -108,6 +109,7 @@
(SCMStateMachine) server.getDivision(groupId).getStateMachine();
this.division = server.getDivision(groupId);
+ this.isStopped = false;
}
public static void initialize(String clusterId, String scmId,
@@ -235,9 +237,17 @@
public void stop() throws IOException {
LOG.info("stopping ratis server {}", server.getPeer().getAddress());
server.close();
+ isStopped = true;
+ getSCMStateMachine().close();
}
@Override
+ public boolean isStopped() {
+ return isStopped;
+ }
+
+
+ @Override
public List<String> getRatisRoles() throws IOException {
Collection<RaftPeer> peers = division.getGroup().getPeers();
List<String> ratisRoles = new ArrayList<>();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 9aeda10..74074bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -390,10 +390,17 @@
if (!isInitialized) {
return;
}
- super.close();
- transactionBuffer.close();
- HadoopExecutors.
- shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+ //if ratis server is stopped , it indicates this `close` originates
+ // from `scm.stop()`, otherwise, it indicates this `close` originates
+ // from ratis.
+ if (scm.getScmHAManager().getRatisServer().isStopped()) {
+ super.close();
+ transactionBuffer.close();
+ HadoopExecutors.
+ shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+ } else {
+ scm.shutDown("scm statemachine is closed by ratis, terminate SCM");
+ }
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index a2bfa42..6fed1e2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.util.Time;
-import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,12 +132,9 @@
.setDaemon(false)
.setNameFormat(THREAD_NAME + " - %d")
.setUncaughtExceptionHandler((Thread t, Throwable ex) -> {
- // gracefully shutdown SCM.
- scmContext.getScm().stop();
-
String message = "Terminate SCM, encounter uncaught exception"
+ " in RatisPipelineUtilsThread";
- ExitUtils.terminate(1, message, ex, LOG);
+ scmContext.getScm().shutDown(message);
})
.build()
.newThread(this::run);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
index 3784f97..fca789d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
@@ -41,6 +41,8 @@
void join();
+ void shutDown(String message);
+
NodeManager getScmNodeManager();
BlockManager getScmBlockManager();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index e869754..395b77c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -141,6 +141,7 @@
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1511,7 +1512,7 @@
try {
LOG.info("Stopping SCM HA services.");
- scmHAManager.shutdown();
+ scmHAManager.stop();
} catch (Exception ex) {
LOG.error("SCM HA Manager stop failed", ex);
}
@@ -1533,6 +1534,12 @@
scmSafeModeManager.stop();
}
+ @Override
+ public void shutDown(String message) {
+ stop();
+ ExitUtils.terminate(1, message, LOG);
+ }
+
/**
* Wait until service has completed shutdown.
*/
diff --git a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
index e7a67ed..deaf37a 100644
--- a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
+++ b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
@@ -37,6 +37,7 @@
<th>HEALTHY</th>
<th>STALE</th>
<th>DEAD</th>
+ <th>HEALTHY_READONLY</th>
</tr>
</thead>
<tbody>
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index c64615f..5326f15 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -67,6 +67,11 @@
}
@Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
public RaftServer.Division getDivision() {
return null;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java
index 17c6dee..c92820e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java
@@ -27,7 +27,9 @@
import org.apache.hadoop.ozone.upgrade.UpgradeException;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import org.apache.ozone.test.LambdaTestUtils;
+import org.apache.ratis.util.ExitUtils;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -63,6 +65,12 @@
private final String dataPath;
private static final String CLUSTER_ID = UUID.randomUUID().toString();
+ @BeforeClass
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
+
@Parameterized.Parameters(name = "haEnabledBefore={0} " +
"haEnabledPreFinalized={1}")
public static Collection<Object[]> cases() {
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
index 25ed0e5..37bea18 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
@@ -63,6 +63,11 @@
description = "Format output as JSON")
private boolean json;
+ @CommandLine.Option(names = { "--replicas" },
+ defaultValue = "false",
+ description = "Adds replica related details")
+ private boolean addReplicaDetails;
+
@Parameters(description = "Decimal id of the container.")
private long containerID;
@@ -103,7 +108,7 @@
LOG.info("Datanodes: [{}]", machinesStr);
// Print the replica details if available
- if (replicas != null) {
+ if (addReplicaDetails && replicas != null) {
String replicaStr = replicas.stream().map(
InfoSubcommand::buildReplicaDetails)
.collect(Collectors.joining(",\n"));
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index 10b5758..c604019 100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@ -97,7 +97,7 @@
.thenReturn(getReplicas(includeIndex));
cmd = new InfoSubcommand();
CommandLine c = new CommandLine(cmd);
- c.parseArgs("1");
+ c.parseArgs("1", "--replicas");
cmd.execute(scmClient);
// Ensure we have a line for Replicas:
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 01d9e91..213868c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -582,8 +582,18 @@
}
}
- String owner = bucketArgs.getOwner() == null ?
- ugi.getShortUserName() : bucketArgs.getOwner();
+ final String owner;
+ // If S3 auth exists, set owner name to the short user name derived from the
+ // accessId. Similar to RpcClient#getDEK
+ if (getThreadLocalS3Auth() != null) {
+ UserGroupInformation s3gUGI = UserGroupInformation.createRemoteUser(
+ getThreadLocalS3Auth().getAccessID());
+ owner = s3gUGI.getShortUserName();
+ } else {
+ owner = bucketArgs.getOwner() == null ?
+ ugi.getShortUserName() : bucketArgs.getOwner();
+ }
+
Boolean isVersionEnabled = bucketArgs.getVersioning() == null ?
Boolean.FALSE : bucketArgs.getVersioning();
StorageType storageType = bucketArgs.getStorageType() == null ?
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c000ad5..5717ca6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -319,4 +319,9 @@
public static final long OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
= 1000;
+ public static final String OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT =
+ "ozone.om.unflushed.transaction.max.count";
+ public static final int OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT
+ = 10000;
+
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index d9bb975..001322e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -133,40 +133,36 @@
this.omProxyInfos = new HashMap<>();
this.omNodeIDList = new ArrayList<>();
- Collection<String> omServiceIds = Collections.singletonList(omSvcId);
+ Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
+ omSvcId);
- for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
- Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
- serviceId);
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
- for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+ String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ omSvcId, nodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+ if (rpcAddrStr == null) {
+ continue;
+ }
- String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
- serviceId, nodeId);
- String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
- if (rpcAddrStr == null) {
- continue;
+ OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
+ rpcAddrStr);
+
+ if (omProxyInfo.getAddress() != null) {
+
+
+ // For a non-HA OM setup, nodeId might be null. If so, we assign it
+ // the default value
+ if (nodeId == null) {
+ nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
}
-
- OMProxyInfo omProxyInfo = new OMProxyInfo(serviceId, nodeId,
- rpcAddrStr);
-
- if (omProxyInfo.getAddress() != null) {
-
-
- // For a non-HA OM setup, nodeId might be null. If so, we assign it
- // the default value
- if (nodeId == null) {
- nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
- }
- // ProxyInfo will be set during first time call to server.
- omProxies.put(nodeId, null);
- omProxyInfos.put(nodeId, omProxyInfo);
- omNodeIDList.add(nodeId);
- } else {
- LOG.error("Failed to create OM proxy for {} at address {}",
- nodeId, rpcAddrStr);
- }
+ // ProxyInfo will be set during first time call to server.
+ omProxies.put(nodeId, null);
+ omProxyInfos.put(nodeId, omProxyInfo);
+ omNodeIDList.add(nodeId);
+ } else {
+ LOG.error("Failed to create OM proxy for {} at address {}",
+ nodeId, rpcAddrStr);
}
}
diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
index 2491a1a..078824c 100755
--- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
+++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
@@ -82,6 +82,7 @@
run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties" "etc/hadoop"
+run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/ozone-shell-log4j.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/ozone-site.xml" "etc/hadoop"
run cp -f "${ROOT}/hadoop-ozone/dist/src/shell/conf/log4j.properties" "etc/hadoop"
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 498d02e..4baaca5 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -67,7 +67,7 @@
HDFS-SITE.XML_dfs.datanode.http.address=0.0.0.0:1012
CORE-SITE.XML_dfs.data.transfer.protection=authentication
CORE-SITE.XML_hadoop.security.authentication=kerberos
-CORE-SITE.XML_hadoop.security.auth_to_local=RULE:[2:$1](testuser2.*) RULE:[2:$1@$0](.*)s/.*/root/
+CORE-SITE.XML_hadoop.security.auth_to_local="RULE:[2:$1](testuser2.*) RULE:[2:$1](testuser.*) RULE:[2:$1@$0](.*)s/.*/root/"
CORE-SITE.XML_hadoop.security.key.provider.path=kms://http@kms:9600/kms
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot b/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot
index b30d260..cdea52f 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot
@@ -32,6 +32,14 @@
${result} = Execute AWSS3APICli list-buckets | jq -r '.Buckets[].Name'
Should contain ${result} ${BUCKET}
+Get bucket info with Ozone Shell to check the owner field
+ Pass Execution If '${SECURITY_ENABLED}' == 'false' Skipping this check as security is not enabled
+ ${result} = Execute ozone sh bucket info /s3v/${BUCKET} | jq -r '.owner'
+ Should Be Equal ${result} testuser
+ # In ozonesecure(-ha) docker-config, hadoop.security.auth_to_local is set
+ # in the way that getShortUserName() converts the accessId to "testuser".
+ # Also see "Setup dummy credentials for S3" in commonawslib.robot
+
List buckets with empty access id
Execute aws configure set aws_access_key_id ''
${result} = Execute AWSS3APICli and checkrc list-buckets 255
diff --git a/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties
index 3c4d045..479b455 100644
--- a/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties
+++ b/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties
@@ -28,7 +28,7 @@
# in the configuration
filter.read.type=MarkerFilter
filter.read.marker=READ
-filter.read.onMatch=DENY
+filter.read.onMatch=NEUTRAL
filter.read.onMismatch=NEUTRAL
# filter.write.onMatch=DENY avoids logging all WRITE events
diff --git a/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties
index 57577e1..af707fd 100644
--- a/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties
+++ b/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties
@@ -28,7 +28,7 @@
# in the configuration
filter.read.type=MarkerFilter
filter.read.marker=READ
-filter.read.onMatch=DENY
+filter.read.onMatch=NEUTRAL
filter.read.onMismatch=NEUTRAL
# filter.write.onMatch=DENY avoids logging all WRITE events
diff --git a/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties
new file mode 100644
index 0000000..8bc374e
--- /dev/null
+++ b/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+name=PropertiesConfig
+
+# Checks for config change periodically and reloads
+monitorInterval=30
+
+filter=read,write
+# filter.read.onMatch=DENY avoids logging all READ events
+# filter.read.onMatch=ACCEPT permits logging all READ events
+# The above two settings ignore the log levels in configuration
+# filter.read.onMatch=NEUTRAL permits logging of only those READ events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.read.type=MarkerFilter
+filter.read.marker=READ
+filter.read.onMatch=NEUTRAL
+filter.read.onMismatch=NEUTRAL
+
+# filter.write.onMatch=DENY avoids logging all WRITE events
+# filter.write.onMatch=ACCEPT permits logging all WRITE events
+# The above two settings ignore the log levels in configuration
+# filter.write.onMatch=NEUTRAL permits logging of only those WRITE events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.write.type=MarkerFilter
+filter.write.marker=WRITE
+filter.write.onMatch=NEUTRAL
+filter.write.onMismatch=NEUTRAL
+
+# Log Levels are organized from most specific to least:
+# OFF (most specific, no logging)
+# FATAL (most specific, little data)
+# ERROR
+# WARN
+# INFO
+# DEBUG
+# TRACE (least specific, a lot of data)
+# ALL (least specific, all data)
+
+# Uncomment following section to enable logging to console appender also
+#appenders=console, rolling
+#appender.console.type=Console
+#appender.console.name=STDOUT
+#appender.console.layout.type=PatternLayout
+#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
+
+# Comment this line when using both console and rolling appenders
+appenders=rolling
+
+#Rolling File Appender with size & time thresholds.
+#Rolling is triggered when either threshold is breached.
+#The rolled over file is compressed by default
+#Time interval is specified in seconds 86400s=1 day
+appender.rolling.type=RollingFile
+appender.rolling.name=RollingFile
+appender.rolling.fileName =${sys:hadoop.log.dir}/s3g-audit-${hostName}.log
+appender.rolling.filePattern=${sys:hadoop.log.dir}/s3g-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
+appender.rolling.layout.type=PatternLayout
+appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
+appender.rolling.policies.type=Policies
+appender.rolling.policies.time.type=TimeBasedTriggeringPolicy
+appender.rolling.policies.time.interval=86400
+appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=64MB
+
+loggers=audit
+logger.audit.type=AsyncLogger
+logger.audit.name=S3GAudit
+logger.audit.level=INFO
+logger.audit.appenderRefs=rolling
+logger.audit.appenderRef.file.ref=RollingFile
+
+rootLogger.level=INFO
+#rootLogger.appenderRefs=stdout
+#rootLogger.appenderRef.stdout.ref=STDOUT
diff --git a/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties
index 3f81561..e8c46d6 100644
--- a/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties
+++ b/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties
@@ -28,7 +28,7 @@
# in the configuration
filter.read.type=MarkerFilter
filter.read.marker=READ
-filter.read.onMatch=DENY
+filter.read.onMatch=NEUTRAL
filter.read.onMismatch=NEUTRAL
# filter.write.onMatch=DENY avoids logging all WRITE events
diff --git a/hadoop-ozone/dist/src/shell/ozone/ozone b/hadoop-ozone/dist/src/shell/ozone/ozone
index 3b5ac09..72be8cf 100755
--- a/hadoop-ozone/dist/src/shell/ozone/ozone
+++ b/hadoop-ozone/dist/src/shell/ozone/ozone
@@ -167,6 +167,7 @@
s3g)
OZONE_SUBCMD_SUPPORTDAEMONIZATION="true"
OZONE_CLASSNAME='org.apache.hadoop.ozone.s3.Gateway'
+ OZONE_S3G_OPTS="${OZONE_S3G_OPTS} -Dlog4j.configurationFile=${OZONE_CONF_DIR}/s3g-audit-log4j2.properties"
OZONE_RUN_ARTIFACT_NAME="ozone-s3gateway"
;;
csi)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index b1b6196..dc6e237 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -80,12 +80,15 @@
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+
+import org.apache.ratis.util.ExitUtils;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -129,6 +132,11 @@
private String omId;
private OzoneManagerProtocolClientSideTranslatorPB omClient;
+ @BeforeClass
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
@Before
public void init() {
try {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 2a3d5cc..3eeb3e9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -113,6 +113,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.SCM_GET_PIPELINE_EXCEPTION;
import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+
+import org.apache.ratis.util.ExitUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -168,6 +170,7 @@
@BeforeClass
public static void setUp() throws Exception {
+ ExitUtils.disableSystemExit();
conf = new OzoneConfiguration();
dir = GenericTestUtils.getRandomizedTestDir();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
index 01c5f5a..fb5b437 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -226,7 +226,7 @@
conf2.set(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY,
scm1.getSCMNodeId());
Assert.assertTrue(StorageContainerManager.scmBootstrap(conf1));
- scm1.getScmHAManager().shutdown();
+ scm1.getScmHAManager().stop();
Assert.assertTrue(
StorageContainerManager.scmInit(conf1, scm1.getClusterId()));
Assert.assertTrue(StorageContainerManager.scmBootstrap(conf2));
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f5db636..7731b9e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -257,6 +257,7 @@
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.bouncycastle.pkcs.PKCS10CertificationRequest;
@@ -566,6 +567,10 @@
}
}
+ public boolean isStopped() {
+ return omState == State.STOPPED;
+ }
+
/**
* Set the {@link S3Authentication} for the current rpc handler thread.
*/
@@ -785,20 +790,6 @@
stop();
}
- public void shutdown(Exception ex) throws IOException {
- if (omState != State.STOPPED) {
- stop();
- exitManager.exitSystem(1, ex.getLocalizedMessage(), ex, LOG);
- }
- }
-
- public void shutdown(String errorMsg) throws IOException {
- if (omState != State.STOPPED) {
- stop();
- exitManager.exitSystem(1, errorMsg, LOG);
- }
- }
-
/**
* Class which schedule saving metrics to a file.
*/
@@ -1897,6 +1888,9 @@
*/
public void stop() {
LOG.info("{}: Stopping Ozone Manager", omNodeDetails.getOMPrintInfo());
+ if (isStopped()) {
+ return;
+ }
try {
omState = State.STOPPED;
// Cancel the metrics timer and set to null.
@@ -1932,12 +1926,16 @@
if (omSnapshotProvider != null) {
omSnapshotProvider.stop();
}
- omState = State.STOPPED;
} catch (Exception e) {
LOG.error("OzoneManager stop failed.", e);
}
}
+ public void shutDown(String message) {
+ stop();
+ ExitUtils.terminate(1, message, LOG);
+ }
+
/**
* Wait until service has completed shutdown.
*/
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index eaa38ef..b3c8262 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -28,6 +28,7 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -103,6 +104,7 @@
private final boolean isRatisEnabled;
private final boolean isTracingEnabled;
+ private final Semaphore unFlushedTransactions;
/**
* function which will get term associated with the transaction index.
@@ -120,6 +122,7 @@
private boolean isRatisEnabled = false;
private boolean isTracingEnabled = false;
private Function<Long, Long> indexToTerm = null;
+ private int maxUnFlushedTransactionCount = 0;
public Builder setOmMetadataManager(OMMetadataManager omm) {
this.mm = omm;
@@ -147,22 +150,30 @@
return this;
}
+ public Builder setmaxUnFlushedTransactionCount(int size) {
+ this.maxUnFlushedTransactionCount = size;
+ return this;
+ }
+
public OzoneManagerDoubleBuffer build() {
if (isRatisEnabled) {
Preconditions.checkNotNull(rs, "When ratis is enabled, " +
"OzoneManagerRatisSnapshot should not be null");
Preconditions.checkNotNull(indexToTerm, "When ratis is enabled " +
"indexToTerm should not be null");
+ Preconditions.checkState(maxUnFlushedTransactionCount > 0L,
+ "when ratis is enable, maxUnFlushedTransactions " +
+ "should be bigger than 0");
}
return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
- isTracingEnabled, indexToTerm);
+ isTracingEnabled, indexToTerm, maxUnFlushedTransactionCount);
}
}
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
boolean isRatisEnabled, boolean isTracingEnabled,
- Function<Long, Long> indexToTerm) {
+ Function<Long, Long> indexToTerm, int maxUnFlushedTransactions) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
@@ -175,7 +186,7 @@
this.currentFutureQueue = null;
this.readyFutureQueue = null;
}
-
+ this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
this.omMetadataManager = omMetadataManager;
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
this.ozoneManagerDoubleBufferMetrics =
@@ -190,6 +201,22 @@
}
+ /**
+ * Acquires the given number of permits from unFlushedTransactions,
+ * blocking until all are available, or the thread is interrupted.
+ */
+ public void acquireUnFlushedTransactions(int n) throws InterruptedException {
+ unFlushedTransactions.acquire(n);
+ }
+
+ /**
+ *Releases the given number of permits,
+ *returning them to the unFlushedTransactions.
+ */
+ public void releaseUnFlushedTransactions(int n) {
+ unFlushedTransactions.release(n);
+ }
+
// TODO: pass the trace id further down and trace all methods of DBStore.
/**
@@ -289,7 +316,7 @@
long startTime = Time.monotonicNow();
flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
- (SupplierWithIOException<Void>) () -> {
+ () -> {
omMetadataManager.getStore().commitBatchOperation(
batchOperation);
return null;
@@ -336,14 +363,16 @@
readyBuffer.clear();
+ if (isRatisEnabled) {
+ releaseUnFlushedTransactions(flushedTransactionsSize);
+ }
+
// update the last updated index in OzoneManagerStateMachine.
ozoneManagerRatisSnapShot.updateLastAppliedIndex(
flushedEpochs);
// set metrics.
updateMetrics(flushedTransactionsSize);
-
-
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 5305ce3..761c8e1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -559,7 +559,7 @@
public void stop() {
try {
server.close();
- omStateMachine.stop();
+ omStateMachine.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index e45c52c..78cbabf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.common.ha.ratis.RatisSnapshotInfo;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -53,7 +54,6 @@
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
@@ -118,6 +118,7 @@
this.handler = new OzoneManagerRequestHandler(ozoneManager,
ozoneManagerDoubleBuffer);
+
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
@@ -308,6 +309,10 @@
CompletableFuture<Message> ratisFuture =
new CompletableFuture<>();
applyTransactionMap.put(trxLogIndex, trx.getLogEntry().getTerm());
+
+ //if there are too many pending requests, wait for doubleBuffer flushing
+ ozoneManagerDoubleBuffer.acquireUnFlushedTransactions(1);
+
CompletableFuture<OMResponse> future = CompletableFuture.supplyAsync(
() -> runCommand(request, trxLogIndex), executorService);
future.thenApply(omResponse -> {
@@ -395,9 +400,13 @@
}
public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() {
+ int maxUnflushedTransactionSize = ozoneManager.getConfiguration()
+ .getInt(OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT,
+ OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT);
return new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(ozoneManager.getMetadataManager())
.setOzoneManagerRatisSnapShot(this::updateLastAppliedIndex)
+ .setmaxUnFlushedTransactionCount(maxUnflushedTransactionSize)
.setIndexToTerm(this::getTermForIndex)
.enableRatis(true)
.enableTracing(isTracingEnabled)
@@ -470,8 +479,11 @@
// OM should be shutdown as the StateMachine has shutdown.
LOG.info("StateMachine has shutdown. Shutdown OzoneManager if not " +
"already shutdown.");
- ozoneManager.shutdown(new RaftException("RaftServer called shutdown on " +
- "StateMachine"));
+ if (!ozoneManager.isStopped()) {
+ ozoneManager.shutDown("OM state machine is shutdown by Ratis server");
+ } else {
+ stop();
+ }
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
index 01fade0..3ef2661 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
@@ -114,7 +114,7 @@
+ "-" + snapshotTime;
String snapshotFilePath = Paths.get(omSnapshotDir.getAbsolutePath(),
snapshotFileName).toFile().getAbsolutePath();
- File targetFile = new File(snapshotFileName + ".tar.gz");
+ File targetFile = new File(snapshotFilePath + ".tar.gz");
String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID)
.getOMDBCheckpointEnpointUrl(httpPolicy.isHttpEnabled());
@@ -135,6 +135,13 @@
try (InputStream inputStream = httpURLConnection.getInputStream()) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
+ } catch (IOException ex) {
+ LOG.error("OM snapshot {} cannot be downloaded.", targetFile, ex);
+ boolean deleted = FileUtils.deleteQuietly(targetFile);
+ if (!deleted) {
+ LOG.error("OM snapshot which failed to download {} cannot be deleted",
+ targetFile);
+ }
}
return null;
});
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
index 02718a6..4089a07 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -27,6 +27,8 @@
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +74,11 @@
private static final Logger LOG =
LoggerFactory.getLogger(TestKeyDeletingService.class);
+ @BeforeClass
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
private OzoneConfiguration createConfAndInitValues() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
File newFolder = folder.newFolder();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 826571e..748519e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -70,9 +70,11 @@
import org.apache.ozone.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.ExitUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -99,9 +101,14 @@
private OzoneManagerProtocol writeClient;
private OzoneManager om;
+
+ @BeforeClass
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
@Before
- public void setup() throws Exception {
+ public void init() throws Exception {
configuration = new OzoneConfiguration();
testDir = GenericTestUtils.getRandomizedTestDir();
configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
index c62e3cc..2544f32 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ratis.util.ExitUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -66,6 +67,7 @@
@Before
public void setup() throws IOException, AuthenticationException {
+ ExitUtils.disableSystemExit();
OzoneConfiguration configuration = new OzoneConfiguration();
File folder = tempFolder.newFolder();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index 16dc322..a00acdf 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -81,6 +81,7 @@
doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(omMetadataManager)
.setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+ .setmaxUnFlushedTransactionCount(10000)
.enableRatis(true)
.setIndexToTerm((val) -> term)
.build();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 92d5c62..21e67d0 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -107,9 +107,10 @@
ozoneManagerRatisSnapshot = index -> {
lastAppliedIndex = index.get(index.size() - 1);
};
- doubleBuffer = new OzoneManagerDoubleBuffer.Builder().
- setOmMetadataManager(omMetadataManager).
- setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+ doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
+ .setOmMetadataManager(omMetadataManager)
+ .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+ .setmaxUnFlushedTransactionCount(1)
.enableRatis(true)
.setIndexToTerm((i) -> term)
.build();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 351f524..365eb60 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -46,10 +46,12 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.LifeCycle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -82,6 +84,11 @@
private SecurityConfig secConfig;
private OMCertificateClient certClient;
+ @BeforeClass
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
@Before
public void init() throws Exception {
conf = new OzoneConfiguration();
@@ -112,6 +119,7 @@
initialTermIndex = TermIndex.valueOf(0, 0);
RatisSnapshotInfo omRatisSnapshotInfo = new RatisSnapshotInfo();
when(ozoneManager.getSnapshotInfo()).thenReturn(omRatisSnapshotInfo);
+ when(ozoneManager.getConfiguration()).thenReturn(conf);
secConfig = new SecurityConfig(conf);
certClient = new OMCertificateClient(secConfig);
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index a0a7cd8..48dca13 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -84,6 +84,7 @@
when(ozoneManagerRatisServer.getOzoneManager()).thenReturn(ozoneManager);
when(ozoneManager.getSnapshotInfo()).thenReturn(
Mockito.mock(RatisSnapshotInfo.class));
+ when(ozoneManager.getConfiguration()).thenReturn(conf);
ozoneManagerStateMachine =
new OzoneManagerStateMachine(ozoneManagerRatisServer, false);
ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher
new file mode 100644
index 0000000..6e86731
--- /dev/null
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.hadoop.fs.ozone.O3fsDtFetcher
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
new file mode 100644
index 0000000..e0292bc
--- /dev/null
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.ozone.security.OzoneTokenIdentifier
+org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier
\ No newline at end of file
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
new file mode 100644
index 0000000..ac1b3cf
--- /dev/null
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl$Renewer
+org.apache.hadoop.fs.ozone.BasicRootedOzoneClientAdapterImpl$Renewer
diff --git a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
index bbb8221..ac1b3cf 100644
--- a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
+++ b/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -17,3 +17,4 @@
#
org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl$Renewer
+org.apache.hadoop.fs.ozone.BasicRootedOzoneClientAdapterImpl$Renewer
diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml
index 2cfead4..bf03b18 100644
--- a/hadoop-ozone/recon/pom.xml
+++ b/hadoop-ozone/recon/pom.xml
@@ -103,7 +103,7 @@
<goal>install-node-and-npm</goal>
</goals>
<configuration>
- <nodeVersion>v16.2.0</nodeVersion>
+ <nodeVersion>v16.14.2</nodeVersion>
</configuration>
</execution>
<execution>
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index e57bb70..5f7af83 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -95,7 +95,7 @@
super(conf, scmStorageConfig, eventPublisher, networkTopology,
SCMContext.emptyContext(), scmLayoutVersionManager);
this.reconDatanodeOutdatedTime = reconStaleDatanodeMultiplier *
- HddsServerUtil.getScmHeartbeatInterval(conf);
+ HddsServerUtil.getReconHeartbeatInterval(conf);
this.nodeDB = nodeDB;
loadExistingNodes();
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index c72d2e4..c693bd2 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -83,6 +83,7 @@
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import org.apache.ratis.util.ExitUtils;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -313,6 +314,12 @@
}
}
+ @Override
+ public void shutDown(String message) {
+ stop();
+ ExitUtils.terminate(1, message, LOG);
+ }
+
public ReconDatanodeProtocolServer getDatanodeProtocolServer() {
return datanodeProtocolServer;
}
diff --git a/hadoop-ozone/s3gateway/pom.xml b/hadoop-ozone/s3gateway/pom.xml
index ba9a7b5..ec751ed 100644
--- a/hadoop-ozone/s3gateway/pom.xml
+++ b/hadoop-ozone/s3gateway/pom.xml
@@ -192,6 +192,10 @@
<artifactId>spotbugs</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/ClientIpFilter.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/ClientIpFilter.java
new file mode 100644
index 0000000..921b18d
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/ClientIpFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.s3;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Priority;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerRequestFilter;
+import javax.ws.rs.container.PreMatching;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+
+/**
+ * Filter used to get ClientIP from HttpServletRequest.
+ */
+
+@Provider
+@PreMatching
+@Priority(ClientIpFilter.PRIORITY)
+public class ClientIpFilter implements ContainerRequestFilter {
+
+ public static final int PRIORITY = 200;
+
+ public static final String CLIENT_IP_HEADER = "client_ip";
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ClientIpFilter.class);
+
+ @Context
+ private HttpServletRequest httpServletRequest;
+
+ @Override
+ public void filter(ContainerRequestContext request) throws IOException {
+ String clientIp = httpServletRequest.getHeader("x-real-ip");
+
+ if (clientIp == null || clientIp.isEmpty()) {
+ // extract from forward ips
+ String ipForwarded = httpServletRequest.getHeader("x-forwarded-for");
+ String[] ips = ipForwarded == null ? null : ipForwarded.split(",");
+ clientIp = (ips == null || ips.length == 0) ? null : ips[0];
+
+ // extract from remote addr
+ clientIp = (clientIp == null || clientIp.isEmpty()) ?
+ httpServletRequest.getRemoteAddr() : clientIp;
+ }
+ LOG.trace("Real Ip[{}]", clientIp);
+ request.getHeaders().putSingle(CLIENT_IP_HEADER, clientIp);
+ }
+
+}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
index e02c888..d71a99b 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.audit.S3GAction;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
@@ -37,6 +38,7 @@
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.util.ContinueToken;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
+import org.apache.hadoop.ozone.s3.util.S3Utils;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
@@ -63,6 +65,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
@@ -87,7 +90,7 @@
*/
@GET
@SuppressFBWarnings
- @SuppressWarnings("parameternumber")
+ @SuppressWarnings({"parameternumber", "methodlength"})
public Response get(
@PathParam("bucket") String bucketName,
@QueryParam("delimiter") String delimiter,
@@ -100,33 +103,46 @@
@QueryParam("uploads") String uploads,
@QueryParam("acl") String aclMarker,
@Context HttpHeaders hh) throws OS3Exception, IOException {
-
- if (aclMarker != null) {
- S3BucketAcl result = getAcl(bucketName);
- getMetrics().incGetAclSuccess();
- return Response.ok(result, MediaType.APPLICATION_XML_TYPE).build();
- }
-
- if (uploads != null) {
- return listMultipartUploads(bucketName, prefix);
- }
-
- if (prefix == null) {
- prefix = "";
- }
-
- OzoneBucket bucket = getBucket(bucketName);
-
+ S3GAction s3GAction = S3GAction.GET_BUCKET;
+ Map<String, String> auditParams = S3Utils.genAuditParam(
+ "bucket", bucketName,
+ "delimiter", delimiter,
+ "encoding-type", encodingType,
+ "marker", marker,
+ "max-keys", String.valueOf(maxKeys),
+ "prefix", prefix,
+ "continuation-token", continueToken,
+ "start-after", startAfter
+ );
Iterator<? extends OzoneKey> ozoneKeyIterator;
-
ContinueToken decodedToken =
ContinueToken.decodeFromString(continueToken);
- // Assign marker to startAfter. for the compatibility of aws api v1
- if (startAfter == null && marker != null) {
- startAfter = marker;
- }
try {
+ if (aclMarker != null) {
+ s3GAction = S3GAction.GET_ACL;
+ S3BucketAcl result = getAcl(bucketName);
+ getMetrics().incGetAclSuccess();
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
+ return Response.ok(result, MediaType.APPLICATION_XML_TYPE).build();
+ }
+
+ if (uploads != null) {
+ s3GAction = S3GAction.LIST_MULTIPART_UPLOAD;
+ return listMultipartUploads(bucketName, prefix);
+ }
+
+ if (prefix == null) {
+ prefix = "";
+ }
+
+ // Assign marker to startAfter. for the compatibility of aws api v1
+ if (startAfter == null && marker != null) {
+ startAfter = marker;
+ }
+
+ OzoneBucket bucket = getBucket(bucketName);
if (startAfter != null && continueToken != null) {
// If continuation token and start after both are provided, then we
// ignore start After
@@ -139,12 +155,19 @@
ozoneKeyIterator = bucket.listKeys(prefix);
}
} catch (OMException ex) {
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
getMetrics().incGetBucketFailure();
if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
} else {
throw ex;
}
+ } catch (Exception ex) {
+ getMetrics().incGetBucketFailure();
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
ListObjectResponse response = new ListObjectResponse();
@@ -215,6 +238,7 @@
response.setTruncated(false);
}
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
getMetrics().incGetBucketSuccess();
response.setKeyCount(
response.getCommonPrefixes().size() + response.getContents().size());
@@ -223,19 +247,30 @@
@PUT
public Response put(@PathParam("bucket") String bucketName,
- @QueryParam("acl") String aclMarker,
- @Context HttpHeaders httpHeaders,
- InputStream body) throws IOException, OS3Exception {
- if (aclMarker != null) {
- return putAcl(bucketName, httpHeaders, body);
- }
+ @QueryParam("acl") String aclMarker,
+ @Context HttpHeaders httpHeaders,
+ InputStream body) throws IOException, OS3Exception {
+ S3GAction s3GAction = S3GAction.CREATE_BUCKET;
+ Map<String, String> auditParams = S3Utils.genAuditParam(
+ "bucket", bucketName,
+ "acl", aclMarker
+ );
+
try {
+ if (aclMarker != null) {
+ s3GAction = S3GAction.PUT_ACL;
+ return putAcl(bucketName, httpHeaders, body);
+ }
String location = createS3Bucket(bucketName);
LOG.info("Location is {}", location);
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
getMetrics().incCreateBucketSuccess();
return Response.status(HttpStatus.SC_OK).header("Location", location)
.build();
} catch (OMException exception) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, exception));
getMetrics().incCreateBucketFailure();
if (exception.getResult() == ResultCodes.INVALID_BUCKET_NAME) {
throw newError(S3ErrorTable.INVALID_BUCKET_NAME, bucketName, exception);
@@ -243,6 +278,10 @@
LOG.error("Error in Create Bucket Request for bucket: {}", bucketName,
exception);
throw exception;
+ } catch (Exception ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
}
@@ -250,33 +289,46 @@
@PathParam("bucket") String bucketName,
@QueryParam("prefix") String prefix)
throws OS3Exception, IOException {
+ S3GAction s3GAction = S3GAction.LIST_MULTIPART_UPLOAD;
+ Map<String, String> auditParams = S3Utils.genAuditParam(
+ "bucket", bucketName,
+ "prefix", prefix
+ );
OzoneBucket bucket = getBucket(bucketName);
- OzoneMultipartUploadList ozoneMultipartUploadList;
try {
- ozoneMultipartUploadList = bucket.listMultipartUploads(prefix);
+ OzoneMultipartUploadList ozoneMultipartUploadList =
+ bucket.listMultipartUploads(prefix);
+
+ ListMultipartUploadsResult result = new ListMultipartUploadsResult();
+ result.setBucket(bucketName);
+
+ ozoneMultipartUploadList.getUploads().forEach(upload -> result.addUpload(
+ new ListMultipartUploadsResult.Upload(
+ upload.getKeyName(),
+ upload.getUploadId(),
+ upload.getCreationTime(),
+ S3StorageType.fromReplicationConfig(upload.getReplicationConfig())
+ )));
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
+ getMetrics().incListMultipartUploadsSuccess();
+ return Response.ok(result).build();
} catch (OMException exception) {
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, exception));
getMetrics().incListMultipartUploadsFailure();
if (exception.getResult() == ResultCodes.PERMISSION_DENIED) {
throw newError(S3ErrorTable.ACCESS_DENIED, prefix, exception);
}
throw exception;
+ } catch (Exception ex) {
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
-
- ListMultipartUploadsResult result = new ListMultipartUploadsResult();
- result.setBucket(bucketName);
-
- ozoneMultipartUploadList.getUploads().forEach(upload -> result.addUpload(
- new ListMultipartUploadsResult.Upload(
- upload.getKeyName(),
- upload.getUploadId(),
- upload.getCreationTime(),
- S3StorageType.fromReplicationConfig(upload.getReplicationConfig())
- )));
- getMetrics().incListMultipartUploadsSuccess();
- return Response.ok(result).build();
}
+
/**
* Rest endpoint to check the existence of a bucket.
* <p>
@@ -286,9 +338,20 @@
@HEAD
public Response head(@PathParam("bucket") String bucketName)
throws OS3Exception, IOException {
- getBucket(bucketName);
- getMetrics().incHeadBucketSuccess();
- return Response.ok().build();
+ S3GAction s3GAction = S3GAction.HEAD_BUCKET;
+ Map<String, String> auditParams = S3Utils.genAuditParam(
+ "bucket", bucketName);
+ try {
+ getBucket(bucketName);
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
+ getMetrics().incHeadBucketSuccess();
+ return Response.ok().build();
+ } catch (Exception e) {
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, e));
+ throw e;
+ }
}
/**
@@ -300,10 +363,16 @@
@DELETE
public Response delete(@PathParam("bucket") String bucketName)
throws IOException, OS3Exception {
+ S3GAction s3GAction = S3GAction.DELETE_BUCKET;
+ Map<String, String> auditParams = S3Utils.genAuditParam(
+ "bucket", bucketName
+ );
try {
deleteS3Bucket(bucketName);
} catch (OMException ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
getMetrics().incDeleteBucketFailure();
if (ex.getResult() == ResultCodes.BUCKET_NOT_EMPTY) {
throw newError(S3ErrorTable.BUCKET_NOT_EMPTY, bucketName, ex);
@@ -314,8 +383,13 @@
} else {
throw ex;
}
+ } catch (Exception ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
getMetrics().incDeleteBucketSuccess();
return Response
.status(HttpStatus.SC_NO_CONTENT)
@@ -332,8 +406,15 @@
@POST
@Produces(MediaType.APPLICATION_XML)
public MultiDeleteResponse multiDelete(@PathParam("bucket") String bucketName,
- @QueryParam("delete") String delete,
- MultiDeleteRequest request) throws OS3Exception, IOException {
+ @QueryParam("delete") String delete,
+ MultiDeleteRequest request)
+ throws OS3Exception, IOException {
+ S3GAction s3GAction = S3GAction.MULTI_DELETE;
+ Map<String, String> auditParams = S3Utils.genAuditParam(
+ "bucket", bucketName,
+ "delete", delete
+ );
+
OzoneBucket bucket = getBucket(bucketName);
MultiDeleteResponse result = new MultiDeleteResponse();
if (request.getObjects() != null) {
@@ -363,6 +444,13 @@
}
}
}
+ if (result.getErrors().size() != 0) {
+ AUDIT.logWriteFailure(buildAuditMessageForFailure(s3GAction, auditParams,
+ new Exception("MultiDelete Exception")));
+ } else {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
+ }
return result;
}
@@ -406,6 +494,9 @@
LOG.error("Failed to get acl of Bucket " + bucketName, ex);
throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
}
+ } catch (OS3Exception ex) {
+ getMetrics().incGetAclFailure();
+ throw ex;
}
}
@@ -415,7 +506,7 @@
* see: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html
*/
public Response putAcl(String bucketName, HttpHeaders httpHeaders,
- InputStream body) throws IOException, OS3Exception {
+ InputStream body) throws IOException, OS3Exception {
String grantReads = httpHeaders.getHeaderString(S3Acl.GRANT_READ);
String grantWrites = httpHeaders.getHeaderString(S3Acl.GRANT_WRITE);
String grantReadACP = httpHeaders.getHeaderString(S3Acl.GRANT_READ_CAP);
@@ -433,7 +524,7 @@
&& grantWriteACP == null && grantFull == null) {
S3BucketAcl putBucketAclRequest =
new PutBucketAclRequestUnmarshaller().readFrom(
- null, null, null, null, null, body);
+ null, null, null, null, null, body);
// Handle grants in body
ozoneAclListOnBucket.addAll(
S3Acl.s3AclToOzoneNativeAclOnBucket(putBucketAclRequest));
@@ -473,7 +564,6 @@
S3Acl.ACLType.FULL_CONTROL.getValue()));
}
}
-
// A put request will reset all previous ACLs on bucket
bucket.setAcl(ozoneAclListOnBucket);
// A put request will reset input user/group's permission on volume
@@ -506,24 +596,28 @@
LOG.error("Error in set ACL Request for bucket: {}", bucketName,
exception);
throw exception;
+ } catch (OS3Exception ex) {
+ getMetrics().incPutAclFailure();
+ throw ex;
}
getMetrics().incPutAclSuccess();
return Response.status(HttpStatus.SC_OK).build();
}
/**
- * Example: x-amz-grant-write: \
- * uri="http://acs.amazonaws.com/groups/s3/LogDelivery", id="111122223333", \
- * id="555566667777".
+ * Example: x-amz-grant-write: \
+ * uri="http://acs.amazonaws.com/groups/s3/LogDelivery", id="111122223333", \
+ * id="555566667777".
*/
private List<OzoneAcl> getAndConvertAclOnBucket(String value,
- String permission) throws OS3Exception {
+ String permission)
+ throws OS3Exception {
List<OzoneAcl> ozoneAclList = new ArrayList<>();
if (StringUtils.isEmpty(value)) {
return ozoneAclList;
}
String[] subValues = value.split(",");
- for (String acl: subValues) {
+ for (String acl : subValues) {
String[] part = acl.split("=");
if (part.length != 2) {
throw newError(S3ErrorTable.INVALID_ARGUMENT, acl);
@@ -550,13 +644,14 @@
}
private List<OzoneAcl> getAndConvertAclOnVolume(String value,
- String permission) throws OS3Exception {
+ String permission)
+ throws OS3Exception {
List<OzoneAcl> ozoneAclList = new ArrayList<>();
if (StringUtils.isEmpty(value)) {
return ozoneAclList;
}
String[] subValues = value.split(",");
- for (String acl: subValues) {
+ for (String acl : subValues) {
String[] part = acl.split("=");
if (part.length != 2) {
throw newError(S3ErrorTable.INVALID_ARGUMENT, acl);
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index a9411dd..b6a928f 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -19,12 +19,21 @@
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Context;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.ozone.audit.AuditAction;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditLoggerType;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -39,20 +48,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ozone.s3.ClientIpFilter.CLIENT_IP_HEADER;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
/**
* Basic helpers for all the REST endpoints.
*/
-public abstract class EndpointBase {
+public abstract class EndpointBase implements Auditor {
@Inject
private OzoneClient client;
@Inject
private S3Auth s3Auth;
+ @Context
+ private ContainerRequestContext context;
+
private static final Logger LOG =
LoggerFactory.getLogger(EndpointBase.class);
+ protected static final AuditLogger AUDIT =
+ new AuditLogger(AuditLoggerType.S3GLOGGER);
+
protected OzoneBucket getBucket(OzoneVolume volume, String bucketName)
throws OS3Exception, IOException {
OzoneBucket bucket;
@@ -119,6 +135,7 @@
try {
client.getObjectStore().createS3Bucket(bucketName);
} catch (OMException ex) {
+ getMetrics().incCreateBucketFailure();
if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
} else if (ex.getResult() != ResultCodes.BUCKET_ALREADY_EXISTS) {
@@ -191,6 +208,40 @@
}
}
+ private AuditMessage.Builder auditMessageBaseBuilder(AuditAction op,
+ Map<String, String> auditMap) {
+ AuditMessage.Builder builder = new AuditMessage.Builder()
+ .forOperation(op)
+ .withParams(auditMap);
+ if (s3Auth != null &&
+ s3Auth.getAccessID() != null &&
+ !s3Auth.getAccessID().isEmpty()) {
+ builder.setUser(s3Auth.getAccessID());
+ }
+ if (context != null) {
+ builder.atIp(getClientIpAddress());
+ }
+ return builder;
+ }
+
+ @Override
+ public AuditMessage buildAuditMessageForSuccess(AuditAction op,
+ Map<String, String> auditMap) {
+ AuditMessage.Builder builder = auditMessageBaseBuilder(op, auditMap)
+ .withResult(AuditEventStatus.SUCCESS);
+ return builder.build();
+ }
+
+ @Override
+ public AuditMessage buildAuditMessageForFailure(AuditAction op,
+ Map<String, String> auditMap, Throwable throwable) {
+ AuditMessage.Builder builder = auditMessageBaseBuilder(op, auditMap)
+ .withResult(AuditEventStatus.FAILURE)
+ .withException(throwable);
+ return builder.build();
+ }
+
+
@VisibleForTesting
public void setClient(OzoneClient ozoneClient) {
this.client = ozoneClient;
@@ -204,4 +255,8 @@
public S3GatewayMetrics getMetrics() {
return S3GatewayMetrics.create();
}
+
+ public String getClientIpAddress() {
+ return context.getHeaderString(CLIENT_IP_HEADER);
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 5b37d1f..8824e98 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.S3GAction;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
@@ -110,6 +111,7 @@
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_SUPPORTED_UNIT;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.genAuditParam;
import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
import org.apache.http.HttpStatus;
@@ -166,16 +168,29 @@
@QueryParam("uploadId") @DefaultValue("") String uploadID,
InputStream body) throws IOException, OS3Exception {
- OzoneOutputStream output = null;
-
- if (uploadID != null && !uploadID.equals("")) {
- // If uploadID is specified, it is a request for upload part
- return createMultipartKey(bucketName, keyPath, length,
- partNumber, uploadID, body);
+ S3GAction s3GAction = S3GAction.CREATE_KEY;
+ boolean auditSuccess = true;
+ Map<String, String> auditParams = genAuditParam(
+ "bucket", bucketName,
+ "path", keyPath,
+ "Content-Length", String.valueOf(length),
+ "partNumber", String.valueOf(partNumber)
+ );
+ if (partNumber != 0) {
+ auditParams.put("uploadId", uploadID);
}
+ OzoneOutputStream output = null;
+
String copyHeader = null, storageType = null;
try {
+ if (uploadID != null && !uploadID.equals("")) {
+ s3GAction = S3GAction.CREATE_MULTIPART_KEY;
+ // If uploadID is specified, it is a request for upload part
+ return createMultipartKey(bucketName, keyPath, length,
+ partNumber, uploadID, body);
+ }
+
copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
boolean storageTypeDefault = StringUtils.isEmpty(storageType);
@@ -187,6 +202,7 @@
if (copyHeader != null) {
//Copy object, as copy source available.
+ s3GAction = S3GAction.COPY_OBJECT;
CopyObjectResponse copyObjectResponse = copyObject(
copyHeader, bucket, keyPath, replicationConfig, storageTypeDefault);
return Response.status(Status.OK).entity(copyObjectResponse).header(
@@ -207,6 +223,9 @@
return Response.ok().status(HttpStatus.SC_OK)
.build();
} catch (OMException ex) {
+ auditSuccess = false;
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
if (copyHeader != null) {
getMetrics().incCopyObjectFailure();
} else {
@@ -225,7 +244,16 @@
}
LOG.error("Exception occurred in PutObject", ex);
throw ex;
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
} finally {
+ if (auditSuccess) {
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
+ }
if (output != null) {
output.close();
}
@@ -249,10 +277,21 @@
@QueryParam("max-parts") @DefaultValue("1000") int maxParts,
@QueryParam("part-number-marker") String partNumberMarker,
InputStream body) throws IOException, OS3Exception {
- try {
+ S3GAction s3GAction = S3GAction.GET_KEY;
+ boolean auditSuccess = true;
+ Map<String, String> auditParams = genAuditParam(
+ "bucket", bucketName,
+ "path", keyPath,
+ "uploadId", uploadId,
+ "max-parts", String.valueOf(maxParts),
+ "part-number-marker", partNumberMarker
+ );
+
+ try {
if (uploadId != null) {
// When we have uploadId, this is the request for list Parts.
+ s3GAction = S3GAction.LIST_PARTS;
int partMarker = parsePartNumberMarker(partNumberMarker);
return listParts(bucketName, keyPath, uploadId,
partMarker, maxParts);
@@ -327,6 +366,10 @@
getMetrics().incGetKeySuccess();
return responseBuilder.build();
} catch (OMException ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex)
+ );
if (uploadId != null) {
getMetrics().incListPartsFailure();
} else {
@@ -339,6 +382,18 @@
} else {
throw ex;
}
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex)
+ );
+ throw ex;
+ } finally {
+ if (auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams)
+ );
+ }
}
}
@@ -364,12 +419,19 @@
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath) throws IOException, OS3Exception {
- OzoneKey key;
+ S3GAction s3GAction = S3GAction.HEAD_KEY;
+ Map<String, String> auditParams = genAuditParam(
+ "bucket", bucketName,
+ "keyPath", keyPath
+ );
+ OzoneKey key;
try {
key = getBucket(bucketName).headObject(keyPath);
// TODO: return the specified range bytes of this object.
} catch (OMException ex) {
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
getMetrics().incHeadKeyFailure();
if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
// Just return 404 with no content
@@ -379,6 +441,10 @@
} else {
throw ex;
}
+ } catch (Exception ex) {
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
ResponseBuilder response = Response.ok().status(HttpStatus.SC_OK)
@@ -387,6 +453,7 @@
.header("Content-Type", "binary/octet-stream");
addLastModifiedDate(response, key);
getMetrics().incHeadKeySuccess();
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
return response.build();
}
@@ -433,14 +500,24 @@
@QueryParam("uploadId") @DefaultValue("") String uploadId) throws
IOException, OS3Exception {
+ S3GAction s3GAction = S3GAction.DELETE_KEY;
+ Map<String, String> auditParams = genAuditParam(
+ "bucket", bucketName,
+ "path", keyPath,
+ "uploadId", uploadId
+ );
+
try {
if (uploadId != null && !uploadId.equals("")) {
+ s3GAction = S3GAction.ABORT_MULTIPART_UPLOAD;
return abortMultipartUpload(bucketName, keyPath, uploadId);
}
OzoneBucket bucket = getBucket(bucketName);
bucket.getKey(keyPath);
bucket.deleteKey(keyPath);
} catch (OMException ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
if (uploadId != null && !uploadId.equals("")) {
getMetrics().incAbortMultiPartUploadFailure();
} else {
@@ -461,9 +538,13 @@
} else {
throw ex;
}
-
+ } catch (Exception ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
getMetrics().incDeleteKeySuccess();
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
return Response
.status(Status.NO_CONTENT)
.build();
@@ -482,6 +563,12 @@
@PathParam("path") String key
)
throws IOException, OS3Exception {
+ S3GAction s3GAction = S3GAction.INIT_MULTIPART_UPLOAD;
+ Map<String, String> auditParams = genAuditParam(
+ "bucket", bucket,
+ "path", key
+ );
+
try {
OzoneBucket ozoneBucket = getBucket(bucket);
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
@@ -499,10 +586,14 @@
multipartUploadInitiateResponse.setKey(key);
multipartUploadInitiateResponse.setUploadID(multipartInfo.getUploadID());
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
getMetrics().incInitMultiPartUploadSuccess();
return Response.status(Status.OK).entity(
multipartUploadInitiateResponse).build();
} catch (OMException ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
getMetrics().incInitMultiPartUploadFailure();
if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
throw newError(S3ErrorTable.ACCESS_DENIED, key, ex);
@@ -510,6 +601,10 @@
LOG.error("Error in Initiate Multipart Upload Request for bucket: {}, " +
"key: {}", bucket, key, ex);
throw ex;
+ } catch (Exception ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
}
@@ -537,6 +632,12 @@
@QueryParam("uploadId") @DefaultValue("") String uploadID,
CompleteMultipartUploadRequest multipartUploadRequest)
throws IOException, OS3Exception {
+ S3GAction s3GAction = S3GAction.COMPLETE_MULTIPART_UPLOAD;
+ Map<String, String> auditParams = genAuditParam(
+ "bucket", bucket,
+ "path", key,
+ "uploadId", uploadID
+ );
OzoneBucket ozoneBucket = getBucket(bucket);
// Using LinkedHashMap to preserve ordering of parts list.
Map<Integer, String> partsMap = new LinkedHashMap<>();
@@ -562,10 +663,14 @@
.getHash());
// Location also setting as bucket name.
completeMultipartUploadResponse.setLocation(bucket);
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(s3GAction, auditParams));
getMetrics().incCompleteMultiPartUploadSuccess();
return Response.status(Status.OK).entity(completeMultipartUploadResponse)
.build();
} catch (OMException ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
getMetrics().incCompleteMultiPartUploadFailure();
if (ex.getResult() == ResultCodes.INVALID_PART) {
throw newError(S3ErrorTable.INVALID_PART, key, ex);
@@ -593,6 +698,10 @@
LOG.error("Error in Complete Multipart Upload Request for bucket: {}, " +
", key: {}", bucket, key, ex);
throw ex;
+ } catch (Exception ex) {
+ AUDIT.logWriteFailure(
+ buildAuditMessageForFailure(s3GAction, auditParams, ex));
+ throw ex;
}
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java
index 615fb01..57d0d12 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java
@@ -21,8 +21,10 @@
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
+import org.apache.hadoop.ozone.audit.S3GAction;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.s3.commontypes.BucketMetadata;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
@@ -48,26 +50,43 @@
@GET
public Response get()
throws OS3Exception, IOException {
- ListBucketResponse response = new ListBucketResponse();
-
- Iterator<? extends OzoneBucket> bucketIterator;
+ boolean auditSuccess = true;
try {
- bucketIterator = listS3Buckets(null);
- } catch (Exception e) {
- getMetrics().incListS3BucketsFailure();
- throw e;
- }
+ ListBucketResponse response = new ListBucketResponse();
- while (bucketIterator.hasNext()) {
- OzoneBucket next = bucketIterator.next();
- BucketMetadata bucketMetadata = new BucketMetadata();
- bucketMetadata.setName(next.getName());
- bucketMetadata.setCreationDate(next.getCreationTime());
- response.addBucket(bucketMetadata);
- }
+ Iterator<? extends OzoneBucket> bucketIterator;
+ try {
+ bucketIterator = listS3Buckets(null);
+ } catch (Exception e) {
+ getMetrics().incListS3BucketsFailure();
+ throw e;
+ }
- getMetrics().incListS3BucketsSuccess();
- return Response.ok(response).build();
+ while (bucketIterator.hasNext()) {
+ OzoneBucket next = bucketIterator.next();
+ BucketMetadata bucketMetadata = new BucketMetadata();
+ bucketMetadata.setName(next.getName());
+ bucketMetadata.setCreationDate(next.getCreationTime());
+ response.addBucket(bucketMetadata);
+ }
+
+ getMetrics().incListS3BucketsSuccess();
+ return Response.ok(response).build();
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(S3GAction.LIST_S3_BUCKETS,
+ Collections.emptyMap(), ex)
+ );
+ throw ex;
+ } finally {
+ if (auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(S3GAction.LIST_S3_BUCKETS,
+ Collections.emptyMap())
+ );
+ }
+ }
}
@Override
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
index 012da2a..b8b4ce7 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
@@ -92,9 +92,9 @@
}
/**
- * Create and returns SCMPipelineMetrics instance.
+ * Create and returns S3 Gateway Metrics instance.
*
- * @return SCMPipelineMetrics
+ * @return S3GatewayMetrics
*/
public static synchronized S3GatewayMetrics create() {
if (instance != null) {
@@ -317,4 +317,52 @@
public long getHeadKeySuccess() {
return headKeySuccess.value();
}
+
+ public long getGetBucketSuccess() {
+ return getBucketSuccess.value();
+ }
+
+ public long getGetBucketFailure() {
+ return getBucketFailure.value();
+ }
+
+ public long getCreateBucketSuccess() {
+ return createBucketSuccess.value();
+ }
+
+ public long getCreateBucketFailure() {
+ return createBucketFailure.value();
+ }
+
+ public long getDeleteBucketSuccess() {
+ return deleteBucketSuccess.value();
+ }
+
+ public long getDeleteBucketFailure() {
+ return deleteBucketFailure.value();
+ }
+
+ public long getGetAclSuccess() {
+ return getAclSuccess.value();
+ }
+
+ public long getGetAclFailure() {
+ return getAclFailure.value();
+ }
+
+ public long getPutAclSuccess() {
+ return putAclSuccess.value();
+ }
+
+ public long getPutAclFailure() {
+ return putAclFailure.value();
+ }
+
+ public long getListMultipartUploadsSuccess() {
+ return listMultipartUploadsSuccess.value();
+ }
+
+ public long getListMultipartUploadsFailure() {
+ return listMultipartUploadsFailure.value();
+ }
}
\ No newline at end of file
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java
index d644162..5f110e8 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java
@@ -22,10 +22,13 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.util.Map;
+import java.util.TreeMap;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
@@ -46,6 +49,22 @@
return URLEncoder.encode(str, UTF_8.name());
}
+ public static Map<String, String> genAuditParam(String... strs) {
+ if (strs.length % 2 == 1) {
+ throw new IllegalArgumentException("Unexpected number of parameters: "
+ + strs.length);
+ }
+ Map<String, String> auditParams = new TreeMap<>();
+ for (int i = 0; i < strs.length; i++) {
+ if (StringUtils.isEmpty(strs[i]) || StringUtils.isEmpty(strs[i + 1])) {
+ ++i;
+ continue;
+ }
+ auditParams.put(strs[i], strs[++i]);
+ }
+ return auditParams;
+ }
+
private S3Utils() {
// no instances
}
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
new file mode 100644
index 0000000..a598bde
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.s3;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
+import org.apache.hadoop.ozone.s3.endpoint.RootEndpoint;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for S3Gateway Audit Log.
+ */
+public class TestS3GatewayAuditLog {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestS3GatewayAuditLog.class.getName());
+
+ static {
+ System.setProperty("log4j.configurationFile", "auditlog.properties");
+ System.setProperty("log4j2.contextSelector",
+ "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+ }
+
+ private String bucketName = OzoneConsts.BUCKET;
+ private OzoneClient clientStub;
+ private BucketEndpoint bucketEndpoint;
+ private RootEndpoint rootEndpoint;
+ private ObjectEndpoint keyEndpoint;
+ private OzoneBucket bucket;
+
+ @Before
+ public void setup() throws Exception {
+
+ clientStub = new OzoneClientStub();
+ clientStub.getObjectStore().createS3Bucket(bucketName);
+ bucket = clientStub.getObjectStore().getS3Bucket(bucketName);
+
+ bucketEndpoint = new BucketEndpoint();
+ bucketEndpoint.setClient(clientStub);
+
+ rootEndpoint = new RootEndpoint();
+ rootEndpoint.setClient(clientStub);
+
+ keyEndpoint = new ObjectEndpoint();
+ keyEndpoint.setClient(clientStub);
+ keyEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ File file = new File("audit.log");
+ if (FileUtils.deleteQuietly(file)) {
+ LOG.info("{} has been deleted as all tests have completed.",
+ file.getName());
+ } else {
+ LOG.info("audit.log could not be deleted.");
+ }
+ }
+
+ @Test
+ public void testHeadBucket() throws Exception {
+ bucketEndpoint.head(bucketName);
+ String expected = "INFO | S3GAudit | ? | user=null | ip=null | " +
+ "op=HEAD_BUCKET {bucket=bucket} | ret=SUCCESS";
+ verifyLog(expected);
+ }
+
+ @Test
+ public void testListBucket() throws Exception {
+
+ rootEndpoint.get().getEntity();
+ String expected = "INFO | S3GAudit | ? | user=null | ip=null | " +
+ "op=LIST_S3_BUCKETS {} | ret=SUCCESS";
+ verifyLog(expected);
+ }
+
+ @Test
+ public void testHeadObject() throws Exception {
+ String value = RandomStringUtils.randomAlphanumeric(32);
+ OzoneOutputStream out = bucket.createKey("key1",
+ value.getBytes(UTF_8).length, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ out.write(value.getBytes(UTF_8));
+ out.close();
+
+
+ keyEndpoint.head(bucketName, "key1");
+ String expected = "INFO | S3GAudit | ? | user=null | ip=null | " +
+ "op=HEAD_KEY {bucket=bucket, keyPath=key1} | ret=SUCCESS";
+ verifyLog(expected);
+
+ }
+
+ private void verifyLog(String expectedString) throws IOException {
+ File file = new File("audit.log");
+ List<String> lines = FileUtils.readLines(file, (String)null);
+ final int retry = 5;
+ int i = 0;
+ while (lines.isEmpty() && i < retry) {
+ lines = FileUtils.readLines(file, (String)null);
+ try {
+ Thread.sleep(500 * (i + 1));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ i++;
+ }
+ assertEquals(lines.get(0), expectedString);
+
+ //empty the file
+ lines.clear();
+ FileUtils.writeLines(file, lines, false);
+ }
+
+}
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
index c5d6606..ccf36a7 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
@@ -31,13 +31,23 @@
import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
import org.apache.hadoop.ozone.s3.endpoint.RootEndpoint;
+import org.apache.hadoop.ozone.s3.endpoint.TestBucketAcl;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
+import static java.net.HttpURLConnection.HTTP_OK;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Tests for {@link S3GatewayMetrics}.
@@ -50,9 +60,11 @@
private RootEndpoint rootEndpoint;
private ObjectEndpoint keyEndpoint;
private OzoneBucket bucket;
-
+ private HttpHeaders headers;
+ private static final String ACL_MARKER = "acl";
private S3GatewayMetrics metrics;
+
@Before
public void setup() throws Exception {
clientStub = new OzoneClientStub();
@@ -69,6 +81,7 @@
keyEndpoint.setClient(clientStub);
keyEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+ headers = Mockito.mock(HttpHeaders.class);
metrics = bucketEndpoint.getMetrics();
}
@@ -110,4 +123,160 @@
long curMetric = metrics.getHeadKeySuccess();
assertEquals(1L, curMetric - oriMetric);
}
-}
+
+ @Test
+ public void testGetBucketSuccess() throws Exception {
+ long oriMetric = metrics.getGetBucketSuccess();
+
+ clientStub = createClientWithKeys("file1");
+ bucketEndpoint.setClient(clientStub);
+ bucketEndpoint.get(bucketName, null,
+ null, null, 1000, null,
+ null, "random", null,
+ null, null).getEntity();
+
+ long curMetric = metrics.getGetBucketSuccess();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testGetBucketFailure() throws Exception {
+ long oriMetric = metrics.getGetBucketFailure();
+
+ try {
+ // Searching for a bucket that does not exist
+ bucketEndpoint.get("newBucket", null,
+ null, null, 1000, null,
+ null, "random", null,
+ null, null);
+ fail();
+ } catch (OS3Exception e) {
+ }
+
+ long curMetric = metrics.getGetBucketFailure();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testCreateBucketSuccess() throws Exception {
+
+ long oriMetric = metrics.getCreateBucketSuccess();
+
+ bucketEndpoint.put(bucketName, null,
+ null, null);
+ long curMetric = metrics.getCreateBucketSuccess();
+
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testCreateBucketFailure() throws Exception {
+ // Creating an error by trying to create a bucket that already exists
+ long oriMetric = metrics.getCreateBucketFailure();
+
+ bucketEndpoint.put(bucketName, null, null, null);
+
+ long curMetric = metrics.getCreateBucketFailure();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testDeleteBucketSuccess() throws Exception {
+ long oriMetric = metrics.getDeleteBucketSuccess();
+
+ bucketEndpoint.delete(bucketName);
+
+ long curMetric = metrics.getDeleteBucketSuccess();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testDeleteBucketFailure() throws Exception {
+ long oriMetric = metrics.getDeleteBucketFailure();
+ bucketEndpoint.delete(bucketName);
+ try {
+ // Deleting a bucket that does not exist will result in delete failure
+ bucketEndpoint.delete(bucketName);
+ fail();
+ } catch (OS3Exception ex) {
+ assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getCode(), ex.getCode());
+ assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getErrorMessage(),
+ ex.getErrorMessage());
+ }
+
+ long curMetric = metrics.getDeleteBucketFailure();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testGetAclSuccess() throws Exception {
+ long oriMetric = metrics.getGetAclSuccess();
+
+ Response response =
+ bucketEndpoint.get(bucketName, null, null,
+ null, 0, null, null,
+ null, null, "acl", null);
+ long curMetric = metrics.getGetAclSuccess();
+ assertEquals(HTTP_OK, response.getStatus());
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testGetAclFailure() throws Exception {
+ long oriMetric = metrics.getGetAclFailure();
+ try {
+ // Failing the getACL endpoint by applying ACL on a non-Existent Bucket
+ bucketEndpoint.get("random_bucket", null,
+ null, null, 0, null,
+ null, null, null, "acl", null);
+ fail();
+ } catch (OS3Exception ex) {
+ assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getCode(), ex.getCode());
+ assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getErrorMessage(),
+ ex.getErrorMessage());
+ }
+ long curMetric = metrics.getGetAclFailure();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testPutAclSuccess() throws Exception {
+ long oriMetric = metrics.getPutAclSuccess();
+
+ clientStub.getObjectStore().createS3Bucket("b1");
+ InputStream inputBody = TestBucketAcl.class.getClassLoader()
+ .getResourceAsStream("userAccessControlList.xml");
+
+ bucketEndpoint.put("b1", ACL_MARKER, headers, inputBody);
+ inputBody.close();
+ long curMetric = metrics.getPutAclSuccess();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ @Test
+ public void testPutAclFailure() throws Exception {
+ // Failing the putACL endpoint by applying ACL on a non-Existent Bucket
+ long oriMetric = metrics.getPutAclFailure();
+
+ InputStream inputBody = TestBucketAcl.class.getClassLoader()
+ .getResourceAsStream("userAccessControlList.xml");
+
+ try {
+ bucketEndpoint.put("unknown_bucket", ACL_MARKER, headers, inputBody);
+ fail();
+ } catch (OS3Exception ex) {
+ } finally {
+ inputBody.close();
+ }
+ long curMetric = metrics.getPutAclFailure();
+ assertEquals(1L, curMetric - oriMetric);
+ }
+
+ private OzoneClient createClientWithKeys(String... keys) throws IOException {
+ OzoneBucket bkt = clientStub.getObjectStore().getS3Bucket(bucketName);
+ for (String key : keys) {
+ bkt.createKey(key, 0).close();
+ }
+ return clientStub;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/s3gateway/src/test/resources/auditlog.properties b/hadoop-ozone/s3gateway/src/test/resources/auditlog.properties
new file mode 100644
index 0000000..18a6b47
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/resources/auditlog.properties
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+name=PropertiesConfig
+
+# Checks for config change periodically and reloads
+monitorInterval=5
+
+filter=read, write
+# filter.read.onMatch = DENY avoids logging all READ events
+# filter.read.onMatch = ACCEPT permits logging all READ events
+# The above two settings ignore the log levels in configuration
+# filter.read.onMatch = NEUTRAL permits logging of only those READ events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.read.type = MarkerFilter
+filter.read.marker = READ
+filter.read.onMatch = ACCEPT
+filter.read.onMismatch = NEUTRAL
+
+# filter.write.onMatch = DENY avoids logging all WRITE events
+# filter.write.onMatch = ACCEPT permits logging all WRITE events
+# The above two settings ignore the log levels in configuration
+# filter.write.onMatch = NEUTRAL permits logging of only those WRITE events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.write.type = MarkerFilter
+filter.write.marker = WRITE
+filter.write.onMatch = NEUTRAL
+filter.write.onMismatch = NEUTRAL
+
+# Log Levels are organized from most specific to least:
+# OFF (most specific, no logging)
+# FATAL (most specific, little data)
+# ERROR
+# WARN
+# INFO
+# DEBUG
+# TRACE (least specific, a lot of data)
+# ALL (least specific, all data)
+
+appenders = console, audit
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %-5level | %c{1} | %msg%n
+
+appender.audit.type = File
+appender.audit.name = AUDITLOG
+appender.audit.fileName=audit.log
+appender.audit.layout.type=PatternLayout
+appender.audit.layout.pattern= %-5level | %c{1} | %C | %msg%n
+
+loggers=audit
+logger.audit.type=AsyncLogger
+logger.audit.name=S3GAudit
+logger.audit.level = INFO
+logger.audit.appenderRefs = audit
+logger.audit.appenderRef.file.ref = AUDITLOG
+
+rootLogger.level = INFO
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java
index 6c6948c..cfb9f5a 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java
@@ -58,6 +58,7 @@
private String sourceBucket;
private Instant creationTime;
private Instant modificationTime;
+ private String owner;
LinkBucket(OzoneBucket ozoneBucket) {
this.volumeName = ozoneBucket.getVolumeName();
@@ -66,6 +67,7 @@
this.sourceBucket = ozoneBucket.getSourceBucket();
this.creationTime = ozoneBucket.getCreationTime();
this.modificationTime = ozoneBucket.getModificationTime();
+ this.owner = ozoneBucket.getOwner();
}
public String getVolumeName() {
@@ -91,6 +93,10 @@
public Instant getModificationTime() {
return modificationTime;
}
+
+ public String getOwner() {
+ return owner;
+ }
}
}