Merge remote-tracking branch 'origin/master' into HDDS-2939
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 370292f..7492a3a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -449,6 +449,11 @@
public static final long OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10); // 10 days
+ public static final String OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION =
+ "ozone.client.key.latest.version.location";
+ public static final boolean OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION_DEFAULT =
+ true;
+
/**
* There is no need to instantiate this class.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2dea94e..81d174c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2849,7 +2849,6 @@
</description>
</property>
-
<property>
<name>ozone.scm.ca.list.retry.interval</name>
<tag>OZONE, SCM, OM, DATANODE</tag>
@@ -2864,6 +2863,14 @@
</property>
<property>
+ <name>ozone.client.key.latest.version.location</name>
+ <tag>OZONE, CLIENT</tag>
+ <value>true</value>
+ <description>Ozone client gets the latest version location.
+ </description>
+ </property>
+
+ <property>
<name>ozone.om.metadata.layout</name>
<tag>OZONE, OM</tag>
<value>SIMPLE</value>
@@ -2894,5 +2901,4 @@
directory deleting service per time interval.
</description>
</property>
-
</configuration>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index e6b4106..8d4820e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -80,7 +81,12 @@
*/
private void publishReport() {
try {
- context.addReport(getReport());
+ GeneratedMessage report = getReport();
+ if (report instanceof CommandStatusReportsProto) {
+ context.addIncrementalReport(report);
+ } else {
+ context.refreshFullReport(report);
+ }
} catch (IOException e) {
LOG.error("Exception while publishing report.", e);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 5c3aaa9..ca73468 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,7 +41,6 @@
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
@@ -63,10 +63,11 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import static java.lang.Math.min;
-import org.apache.commons.collections.CollectionUtils;
-
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
+
+import org.apache.commons.collections.CollectionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,10 +94,6 @@
@VisibleForTesting
static final String CRL_STATUS_REPORT_PROTO_NAME =
CRLStatusReport.getDescriptor().getFullName();
- // Accepted types of reports that can be queued to incrementalReportsQueue
- private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
- Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
- INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
@@ -121,6 +118,14 @@
private boolean shutdownOnError = false;
private boolean shutdownGracefully = false;
private final AtomicLong threadPoolNotAvailableCount;
+ // Endpoint -> ReportType -> Boolean of whether the full report should be
+ // queued in getFullReports call.
+ private final Map<InetSocketAddress,
+ Map<String, AtomicBoolean>> fullReportSendIndicator;
+ // List of supported full report types.
+ private final List<String> fullReportTypeList;
+ // ReportType -> Report.
+ private final Map<String, AtomicReference<GeneratedMessage>> type2Reports;
/**
* term of latest leader SCM, extract from SCMCommand.
@@ -167,6 +172,24 @@
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
threadPoolNotAvailableCount = new AtomicLong(0);
+ fullReportSendIndicator = new HashMap<>();
+ fullReportTypeList = new ArrayList<>();
+ type2Reports = new HashMap<>();
+ initReportTypeCollection();
+ }
+
+ /**
+ * init related ReportType Collections.
+ */
+ private void initReportTypeCollection(){
+ fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME);
+ type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports);
+ fullReportTypeList.add(NODE_REPORT_PROTO_NAME);
+ type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport);
+ fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME);
+ type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
+ fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
+ type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
}
/**
@@ -254,7 +277,7 @@
*
* @param report report to be added
*/
- public void addReport(GeneratedMessage report) {
+ public void addIncrementalReport(GeneratedMessage report) {
if (report == null) {
return;
}
@@ -262,23 +285,38 @@
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
- if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
- containerReports.set(report);
- } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
- nodeReport.set(report);
- } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
- pipelineReports.set(report);
- } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
- synchronized (incrementalReportsQueue) {
- for (InetSocketAddress endpoint : endpoints) {
- incrementalReportsQueue.get(endpoint).add(report);
- }
+ // in some case, we want to add a fullReportType message
+ // as an incremental message.
+ // see XceiverServerRatis#sendPipelineReport
+ synchronized (incrementalReportsQueue) {
+ for (InetSocketAddress endpoint : endpoints) {
+ incrementalReportsQueue.get(endpoint).add(report);
}
- } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
- crlStatusReport.set(report);
- } else {
+ }
+ }
+
+ /**
+ * refresh Full report.
+ *
+ * @param report report to be refreshed
+ */
+ public void refreshFullReport(GeneratedMessage report) {
+ if (report == null) {
+ return;
+ }
+ final Descriptor descriptor = report.getDescriptorForType();
+ Preconditions.checkState(descriptor != null);
+ final String reportType = descriptor.getFullName();
+ Preconditions.checkState(reportType != null);
+ if (!fullReportTypeList.contains(reportType)) {
throw new IllegalArgumentException(
- "Unidentified report message type: " + reportType);
+ "not full report message type: " + reportType);
+ }
+ type2Reports.get(reportType).set(report);
+ if (fullReportSendIndicator != null) {
+ for (Map<String, AtomicBoolean> mp : fullReportSendIndicator.values()) {
+ mp.get(reportType).set(true);
+ }
}
}
@@ -301,10 +339,6 @@
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
- if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
- throw new IllegalArgumentException(
- "Unaccepted report message type: " + reportType);
- }
}
synchronized (incrementalReportsQueue) {
if (incrementalReportsQueue.containsKey(endpoint)){
@@ -340,23 +374,27 @@
return reportsToReturn;
}
- List<GeneratedMessage> getNonIncrementalReports() {
+ List<GeneratedMessage> getFullReports(
+ InetSocketAddress endpoint) {
+ Map<String, AtomicBoolean> mp = fullReportSendIndicator.get(endpoint);
List<GeneratedMessage> nonIncrementalReports = new LinkedList<>();
- GeneratedMessage report = containerReports.get();
- if (report != null) {
- nonIncrementalReports.add(report);
- }
- report = nodeReport.get();
- if (report != null) {
- nonIncrementalReports.add(report);
- }
- report = pipelineReports.get();
- if (report != null) {
- nonIncrementalReports.add(report);
- }
- report = crlStatusReport.get();
- if (report != null) {
- nonIncrementalReports.add(report);
+ if (null != mp){
+ for (Map.Entry<String, AtomicBoolean> kv : mp.entrySet()) {
+ if (kv.getValue().get()) {
+ String reportType = kv.getKey();
+ final AtomicReference<GeneratedMessage> ref =
+ type2Reports.get(reportType);
+ if (ref == null) {
+ throw new RuntimeException(reportType + " is not a valid full "
+ + "report type!");
+ }
+ final GeneratedMessage msg = ref.get();
+ if (msg != null) {
+ nonIncrementalReports.add(msg);
+ mp.get(reportType).set(false);
+ }
+ }
+ }
}
return nonIncrementalReports;
}
@@ -372,7 +410,7 @@
if (maxLimit < 0) {
throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit);
}
- List<GeneratedMessage> reports = getNonIncrementalReports();
+ List<GeneratedMessage> reports = getFullReports(endpoint);
if (maxLimit <= reports.size()) {
return reports.subList(0, maxLimit);
} else {
@@ -800,6 +838,11 @@
this.containerActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new LinkedList<>());
this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
+ Map<String, AtomicBoolean> mp = new HashMap<>();
+ fullReportTypeList.forEach(e -> {
+ mp.putIfAbsent(e, new AtomicBoolean(true));
+ });
+ this.fullReportSendIndicator.putIfAbsent(endpoint, mp);
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 60b7978..cb65e37 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -150,7 +150,7 @@
} catch (IOException ex) {
Preconditions.checkState(requestBuilder != null);
// put back the reports which failed to be sent
- putBackReports(requestBuilder);
+ putBackIncrementalReports(requestBuilder);
rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
@@ -159,7 +159,8 @@
}
// TODO: Make it generic.
- private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
+ private void putBackIncrementalReports(
+ SCMHeartbeatRequestProto.Builder requestBuilder) {
List<GeneratedMessage> reports = new LinkedList<>();
// We only put back CommandStatusReports and IncrementalContainerReport
// because those are incremental. Container/Node/PipelineReport are
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6fd2706..3a2aec9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -917,7 +917,8 @@
private void sendPipelineReport() {
if (context != null) {
// TODO: Send IncrementalPipelineReport instead of full PipelineReport
- context.addReport(context.getParent().getContainer().getPipelineReport());
+ context.addIncrementalReport(
+ context.getParent().getContainer().getPipelineReport());
context.getParent().triggerHeartbeat();
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 736e3d6..2312c0b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -130,7 +130,7 @@
.newBuilder()
.addReport(containerReplicaProto)
.build();
- context.addReport(icr);
+ context.addIncrementalReport(icr);
context.getParent().triggerHeartbeat();
};
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index c97d703..83e44d3 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -133,7 +133,7 @@
Thread.sleep(150);
executorService.shutdown();
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
- verify(dummyContext, times(1)).addReport(null);
+ verify(dummyContext, times(1)).refreshFullReport(null);
// After executor shutdown, no new reports should be published
Thread.sleep(100);
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index d152f4d..6d0ad16 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -27,7 +27,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
@@ -106,35 +106,7 @@
// getReports dequeues incremental reports
expectedReportCount.clear();
- // Case 2: Attempt to put back a full report
-
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
- try {
- ctx.putBackReports(Collections.singletonList(
- newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
-
- // Case 3: Put back mixed types of incremental reports
-
+ // Case 2: Put back mixed types of incremental reports
ctx.putBackReports(Arrays.asList(
newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
@@ -152,31 +124,6 @@
checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
// getReports dequeues incremental reports
expectedReportCount.clear();
-
- // Case 4: Attempt to put back mixed types of full reports
-
- try {
- ctx.putBackReports(Arrays.asList(
- newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
- newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
- newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME),
- newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)
- ), scm1);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
-
- // Case 5: Attempt to put back mixed full and incremental reports
-
- try {
- ctx.putBackReports(Arrays.asList(
- newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
- newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
- newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)
- ), scm2);
- fail("Should throw exception when putting back unaccepted reports!");
- } catch (IllegalArgumentException ignored) {
- }
}
@Test
@@ -197,8 +144,48 @@
Map<String, Integer> expectedReportCount = new HashMap<>();
+ // Add a bunch of ContainerReports
+ batchRefreshfullReports(ctx,
+ StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // every time getAllAvailableReports is called , if we want to get a full
+ // report of a certain type, we must call "batchRefreshfullReports" for
+ // this type to refresh.
+ expectedReportCount.remove(StateContext.CONTAINER_REPORTS_PROTO_NAME);
+
+ // Add a bunch of NodeReport
+ batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ expectedReportCount.remove(StateContext.NODE_REPORT_PROTO_NAME);
+
+ // Add a bunch of PipelineReports
+ batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ expectedReportCount.remove(StateContext.PIPELINE_REPORTS_PROTO_NAME);
+
+ // Add a bunch of CommandStatusReports
+ batchAddIncrementalReport(ctx,
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+ expectedReportCount.put(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+ // Should keep all of them
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.remove(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
+
// Add a bunch of IncrementalContainerReport
- batchAddReports(ctx,
+ batchAddIncrementalReport(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
// Should keep all of them
expectedReportCount.put(
@@ -210,9 +197,16 @@
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
}
- void batchAddReports(StateContext ctx, String reportName, int count) {
+ void batchRefreshfullReports(StateContext ctx, String reportName, int count) {
for (int i = 0; i < count; i++) {
- ctx.addReport(newMockReport(reportName));
+ ctx.refreshFullReport(newMockReport(reportName));
+ }
+ }
+
+ void batchAddIncrementalReport(StateContext ctx,
+ String reportName, int count) {
+ for (int i = 0; i < count; i++) {
+ ctx.addIncrementalReport(newMockReport(reportName));
}
}
@@ -243,7 +237,7 @@
assertNull(context1.getPipelineReports());
GeneratedMessage containerReports =
newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
- context1.addReport(containerReports);
+ context1.refreshFullReport(containerReports);
assertNotNull(context1.getContainerReports());
assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME,
@@ -255,7 +249,7 @@
StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
GeneratedMessage nodeReport =
newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
- context2.addReport(nodeReport);
+ context2.refreshFullReport(nodeReport);
assertNull(context2.getContainerReports());
assertNotNull(context2.getNodeReport());
@@ -267,7 +261,7 @@
StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
GeneratedMessage pipelineReports =
newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
- context3.addReport(pipelineReports);
+ context3.refreshFullReport(pipelineReports);
assertNull(context3.getContainerReports());
assertNull(context3.getNodeReport());
@@ -311,7 +305,7 @@
newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
// Try to add report with zero endpoint. Should not be stored.
- stateContext.addReport(generatedMessage);
+ stateContext.addIncrementalReport(generatedMessage);
assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
// Add 2 scm endpoints.
@@ -319,7 +313,7 @@
stateContext.addEndpoint(scm2);
// Add report. Should be added to all endpoints.
- stateContext.addReport(generatedMessage);
+ stateContext.addIncrementalReport(generatedMessage);
List<GeneratedMessage> allAvailableReports =
stateContext.getAllAvailableReports(scm1);
assertEquals(1, allAvailableReports.size());
@@ -462,9 +456,9 @@
// task num greater than pool size
for (int i = 0; i < threadPoolSize; i++) {
- executorService.submit(() -> futureOne.get());
+ executorService.submit((Callable<String>) futureOne::get);
}
- executorService.submit(() -> futureTwo.get());
+ executorService.submit((Callable<String>) futureTwo::get);
Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService));
@@ -483,8 +477,8 @@
ExecutorService executorService = Executors.newFixedThreadPool(1);
CompletableFuture<String> future = new CompletableFuture<>();
- executorService.submit(() -> future.get());
- executorService.submit(() -> future.get());
+ executorService.submit((Callable<String>) future::get);
+ executorService.submit((Callable<String>) future::get);
StateContext subject = new StateContext(new OzoneConfiguration(),
DatanodeStates.INIT, mock(DatanodeStateMachine.class)) {
@@ -549,11 +543,13 @@
Map<String, Integer> expectedReportCount = new HashMap<>();
// Add a bunch of ContainerReports
- batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
- batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
- batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
- batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
- batchAddReports(ctx,
+ batchRefreshfullReports(ctx,
+ StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+ batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+ batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+ batchRefreshfullReports(ctx,
+ StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
+ batchAddIncrementalReport(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
// Should only keep the latest one
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 9b238a1..0f215f4 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -96,7 +96,7 @@
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(NodeReportProto.getDefaultInstance());
+ context.refreshFullReport(NodeReportProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -128,7 +128,7 @@
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(ContainerReportsProto.getDefaultInstance());
+ context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -160,7 +160,8 @@
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ context.addIncrementalReport(
+ CommandStatusReportsProto.getDefaultInstance());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -224,9 +225,10 @@
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
context.addEndpoint(TEST_SCM_ENDPOINT);
- context.addReport(NodeReportProto.getDefaultInstance());
- context.addReport(ContainerReportsProto.getDefaultInstance());
- context.addReport(CommandStatusReportsProto.getDefaultInstance());
+ context.refreshFullReport(NodeReportProto.getDefaultInstance());
+ context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
+ context.addIncrementalReport(
+ CommandStatusReportsProto.getDefaultInstance());
context.addContainerAction(getContainerAction());
endpointTask.call();
SCMHeartbeatRequestProto heartbeat = argument.getValue();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 1cee8d9..b49d05d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -160,6 +160,7 @@
private final boolean checkKeyNameEnabled;
private final OzoneClientConfig clientConfig;
private final Cache<URI, KeyProvider> keyProviderCache;
+ private final boolean getLatestVersionLocation;
/**
* Creates RpcClient instance with the given configuration.
@@ -230,6 +231,9 @@
checkKeyNameEnabled = conf.getBoolean(
OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY,
OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT);
+ getLatestVersionLocation = conf.getBoolean(
+ OzoneConfigKeys.OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION,
+ OzoneConfigKeys.OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION_DEFAULT);
long keyProviderCacheExpiryMs = conf.getTimeDuration(
OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY,
@@ -817,6 +821,7 @@
.setKeyName(keyName)
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
+ .setLatestVersionLocation(getLatestVersionLocation)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return getInputStreamWithRetryFunction(keyInfo);
@@ -929,6 +934,7 @@
.setKeyName(keyName)
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
+ .setLatestVersionLocation(getLatestVersionLocation)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
@@ -1148,6 +1154,7 @@
.setKeyName(keyName)
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
+ .setLatestVersionLocation(getLatestVersionLocation)
.build();
return ozoneManagerClient.getFileStatus(keyArgs);
}
@@ -1171,6 +1178,7 @@
.setBucketName(bucketName)
.setKeyName(keyName)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
+ .setLatestVersionLocation(getLatestVersionLocation)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupFile(keyArgs);
return getInputStreamWithRetryFunction(keyInfo);
@@ -1203,6 +1211,7 @@
.setKeyName(omKeyInfo.getKeyName())
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
+ .setLatestVersionLocation(getLatestVersionLocation)
.build();
return ozoneManagerClient.lookupKey(omKeyArgs);
} catch (IOException e) {
@@ -1240,6 +1249,7 @@
.setKeyName(keyName)
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
+ .setLatestVersionLocation(getLatestVersionLocation)
.build();
return ozoneManagerClient
.listStatus(keyArgs, recursive, startKey, numEntries);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 9a0bf6b..7b298d4 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -46,6 +46,7 @@
private boolean refreshPipeline;
private boolean sortDatanodesInPipeline;
private List<OzoneAcl> acls;
+ private boolean latestVersionLocation;
private boolean recursive;
@SuppressWarnings("parameternumber")
@@ -54,7 +55,8 @@
List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
String uploadID, int partNumber,
Map<String, String> metadataMap, boolean refreshPipeline,
- List<OzoneAcl> acls, boolean sortDatanode, boolean recursive) {
+ List<OzoneAcl> acls, boolean sortDatanode,
+ boolean latestVersionLocation, boolean recursive) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
@@ -68,6 +70,7 @@
this.refreshPipeline = refreshPipeline;
this.acls = acls;
this.sortDatanodesInPipeline = sortDatanode;
+ this.latestVersionLocation = latestVersionLocation;
this.recursive = recursive;
}
@@ -135,6 +138,10 @@
return sortDatanodesInPipeline;
}
+ public boolean getLatestVersionLocation() {
+ return latestVersionLocation;
+ }
+
public boolean isRecursive() {
return recursive;
}
@@ -174,6 +181,7 @@
.addAllMetadata(metadata)
.setRefreshPipeline(refreshPipeline)
.setSortDatanodesInPipeline(sortDatanodesInPipeline)
+ .setLatestVersionLocation(latestVersionLocation)
.setAcls(acls);
}
@@ -193,6 +201,7 @@
private Map<String, String> metadata = new HashMap<>();
private boolean refreshPipeline;
private boolean sortDatanodesInPipeline;
+ private boolean latestVersionLocation;
private List<OzoneAcl> acls;
private boolean recursive;
@@ -266,6 +275,11 @@
return this;
}
+ public Builder setLatestVersionLocation(boolean latest) {
+ this.latestVersionLocation = latest;
+ return this;
+ }
+
public Builder setRecursive(boolean isRecursive) {
this.recursive = isRecursive;
return this;
@@ -276,7 +290,7 @@
replicationConfig, locationInfoList, isMultipartKey,
multipartUploadID,
multipartUploadPartNumber, metadata, refreshPipeline, acls,
- sortDatanodesInPipeline, recursive);
+ sortDatanodesInPipeline, latestVersionLocation, recursive);
}
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 92d5823..996e04f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -37,7 +37,6 @@
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.util.Time;
-import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,17 +82,6 @@
this.bucketName = bucketName;
this.keyName = keyName;
this.dataSize = dataSize;
- // it is important that the versions are ordered from old to new.
- // Do this sanity check when versions got loaded on creating OmKeyInfo.
- // TODO : this is not necessary, here only because versioning is still a
- // work in-progress, remove this following check when versioning is
- // complete and prove correctly functioning
- long currentVersion = -1;
- for (OmKeyLocationInfoGroup version : versions) {
- Preconditions.checkArgument(
- currentVersion + 1 == version.getVersion());
- currentVersion = version.getVersion();
- }
this.keyLocationVersions = versions;
this.creationTime = creationTime;
this.modificationTime = modificationTime;
@@ -170,6 +158,11 @@
return keyLocationVersions;
}
+ public void setKeyLocationVersions(
+ List<OmKeyLocationInfoGroup> keyLocationVersions) {
+ this.keyLocationVersions = keyLocationVersions;
+ }
+
public void updateModifcationTime() {
this.modificationTime = Time.monotonicNow();
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index ec7e097..c13ef9f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -681,6 +681,7 @@
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize())
.setSortDatanodes(args.getSortDatanodes())
+ .setLatestVersionLocation(args.getLatestVersionLocation())
.build();
req.setKeyArgs(keyArgs);
@@ -1215,6 +1216,7 @@
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setSortDatanodes(args.getSortDatanodes())
+ .setLatestVersionLocation(args.getLatestVersionLocation())
.build();
GetFileStatusRequest req =
GetFileStatusRequest.newBuilder()
@@ -1268,6 +1270,7 @@
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setSortDatanodes(args.getSortDatanodes())
+ .setLatestVersionLocation(args.getLatestVersionLocation())
.build();
LookupFileRequest lookupFileRequest = LookupFileRequest.newBuilder()
.setKeyArgs(keyArgs)
@@ -1432,6 +1435,7 @@
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setSortDatanodes(args.getSortDatanodes())
+ .setLatestVersionLocation(args.getLatestVersionLocation())
.build();
ListStatusRequest listStatusRequest =
ListStatusRequest.newBuilder()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 5ff95ce..addd15c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -393,7 +393,8 @@
logCapturer.clearOutput();
cluster.getHddsDatanodes().get(0)
- .getDatanodeStateMachine().getContext().addReport(dummyReport);
+ .getDatanodeStateMachine().getContext().
+ addIncrementalReport(dummyReport);
cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine().triggerHeartbeat();
// wait for event to be handled by event handler
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index e27afa7..a06a48b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -819,6 +819,107 @@
}
@Test
+ public void testLatestLocationVersion() throws IOException {
+ String keyName = RandomStringUtils.randomAlphabetic(5);
+ OmKeyArgs keyArgs = createBuilder()
+ .setKeyName(keyName)
+ .setLatestVersionLocation(true)
+ .build();
+
+ // lookup for a non-existent key
+ try {
+ keyManager.lookupKey(keyArgs, null);
+ Assert.fail("Lookup key should fail for non existent key");
+ } catch (OMException ex) {
+ if (ex.getResult() != OMException.ResultCodes.KEY_NOT_FOUND) {
+ throw ex;
+ }
+ }
+
+ // create a key
+ OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+ // randomly select 3 datanodes
+ List<DatanodeDetails> nodeList = new ArrayList<>();
+ nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+ 0, null, null, null, null, 0));
+ nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+ 1, null, null, null, null, 0));
+ nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+ 2, null, null, null, null, 0));
+ Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(1)));
+ Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(2)));
+ // create a pipeline using 3 datanodes
+ Pipeline pipeline = scm.getPipelineManager().createPipeline(
+ new RatisReplicationConfig(ReplicationFactor.THREE), nodeList);
+ List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+ List<OmKeyLocationInfo> locationList =
+ keySession.getKeyInfo().getLatestVersionLocations().getLocationList();
+ Assert.assertEquals(1, locationList.size());
+ locationInfoList.add(
+ new OmKeyLocationInfo.Builder().setPipeline(pipeline)
+ .setBlockID(new BlockID(locationList.get(0).getContainerID(),
+ locationList.get(0).getLocalID())).build());
+ keyArgs.setLocationInfoList(locationInfoList);
+
+ keyManager.commitKey(keyArgs, keySession.getId());
+ OmKeyInfo key = keyManager.lookupKey(keyArgs, null);
+ Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
+
+ keySession = keyManager.createFile(keyArgs, true, true);
+ keyManager.commitKey(keyArgs, keySession.getId());
+
+ // Test lookupKey (latestLocationVersion == true)
+ key = keyManager.lookupKey(keyArgs, null);
+ Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
+
+ // Test ListStatus (latestLocationVersion == true)
+ List<OzoneFileStatus> fileStatuses =
+ keyManager.listStatus(keyArgs, false, "", 1);
+ Assert.assertEquals(fileStatuses.size(), 1);
+ Assert.assertEquals(fileStatuses.get(0).getKeyInfo()
+ .getKeyLocationVersions().size(), 1);
+
+ // Test GetFileStatus (latestLocationVersion == true)
+ OzoneFileStatus ozoneFileStatus = keyManager.getFileStatus(keyArgs, null);
+ Assert.assertEquals(ozoneFileStatus.getKeyInfo()
+ .getKeyLocationVersions().size(), 1);
+
+ // Test LookupFile (latestLocationVersion == true)
+ key = keyManager.lookupFile(keyArgs, null);
+ Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
+
+ keyArgs = createBuilder()
+ .setKeyName(keyName)
+ .setLatestVersionLocation(false)
+ .build();
+
+ // Test lookupKey (latestLocationVersion == false)
+ key = keyManager.lookupKey(keyArgs, null);
+ Assert.assertEquals(key.getKeyLocationVersions().size(), 2);
+
+ // Test ListStatus (latestLocationVersion == false)
+ fileStatuses = keyManager.listStatus(keyArgs, false, "", 100);
+ Assert.assertEquals(fileStatuses.size(), 1);
+ Assert.assertEquals(fileStatuses.get(0).getKeyInfo()
+ .getKeyLocationVersions().size(), 2);
+
+ // Test GetFileStatus (latestLocationVersion == false)
+ ozoneFileStatus = keyManager.getFileStatus(keyArgs, null);
+ Assert.assertEquals(ozoneFileStatus.getKeyInfo()
+ .getKeyLocationVersions().size(), 2);
+
+ // Test LookupFile (latestLocationVersion == false)
+ key = keyManager.lookupFile(keyArgs, null);
+ Assert.assertEquals(key.getKeyLocationVersions().size(), 2);
+
+ // Test ListKeys (latestLocationVersion is always true for ListKeys)
+ List<OmKeyInfo> keyInfos = keyManager.listKeys(keyArgs.getVolumeName(),
+ keyArgs.getBucketName(), "", keyArgs.getKeyName(), 100);
+ Assert.assertEquals(keyInfos.size(), 1);
+ Assert.assertEquals(keyInfos.get(0).getKeyLocationVersions().size(), 1);
+ }
+
+ @Test
public void testListStatusWithTableCache() throws Exception {
// Inspired by TestOmMetadataManager#testListKeys
String prefixKeyInDB = "key-d";
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 295d2ad..650e9bb 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -741,9 +741,10 @@
// This will be set by leader OM in HA and update the original request.
optional FileEncryptionInfoProto fileEncryptionInfo = 15;
+ optional bool latestVersionLocation = 16;
// This will be set when user performs delete directory recursively.
- optional bool recursive = 16;
+ optional bool recursive = 17;
}
message KeyLocation {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index ac65aab..bcb0ccf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -676,6 +676,10 @@
throw new OMException("Key:" + keyName + " not found", KEY_NOT_FOUND);
}
+ if (args.getLatestVersionLocation()) {
+ slimLocationVersion(value);
+ }
+
// add block token for read.
addBlockToken4Read(value);
@@ -947,6 +951,11 @@
List<OmKeyInfo> keyList = metadataManager.listKeys(volumeName, bucketName,
startKey, keyPrefix, maxKeys);
+ // For listKeys, we return the latest Key Location by default
+ for (OmKeyInfo omKeyInfo : keyList) {
+ slimLocationVersion(omKeyInfo);
+ }
+
return keyList;
}
@@ -1903,10 +1912,12 @@
if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
return getOzoneFileStatusFSO(volumeName, bucketName, keyName,
- args.getSortDatanodes(), clientAddress, false);
+ args.getSortDatanodes(), clientAddress,
+ args.getLatestVersionLocation(), false);
}
return getOzoneFileStatus(volumeName, bucketName, keyName,
- args.getRefreshPipeline(), args.getSortDatanodes(), clientAddress);
+ args.getRefreshPipeline(), args.getSortDatanodes(),
+ args.getLatestVersionLocation(), clientAddress);
}
private OzoneFileStatus getOzoneFileStatus(String volumeName,
@@ -1914,6 +1925,7 @@
String keyName,
boolean refreshPipeline,
boolean sortDatanodes,
+ boolean latestLocationVersion,
String clientAddress)
throws IOException {
OmKeyInfo fileKeyInfo = null;
@@ -1947,6 +1959,9 @@
// if the key is a file then do refresh pipeline info in OM by asking SCM
if (fileKeyInfo != null) {
+ if (latestLocationVersion) {
+ slimLocationVersion(fileKeyInfo);
+ }
// refreshPipeline flag check has been removed as part of
// https://issues.apache.org/jira/browse/HDDS-3658.
// Please refer this jira for more details.
@@ -1972,7 +1987,8 @@
private OzoneFileStatus getOzoneFileStatusFSO(String volumeName,
String bucketName, String keyName, boolean sortDatanodes,
- String clientAddress, boolean skipFileNotFoundError) throws IOException {
+ String clientAddress, boolean latestLocationVersion,
+ boolean skipFileNotFoundError) throws IOException {
OzoneFileStatus fileStatus = null;
metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
bucketName);
@@ -1994,9 +2010,10 @@
if (fileStatus != null) {
// if the key is a file then do refresh pipeline info in OM by asking SCM
if (fileStatus.isFile()) {
-
OmKeyInfo fileKeyInfo = fileStatus.getKeyInfo();
-
+ if (latestLocationVersion) {
+ slimLocationVersion(fileKeyInfo);
+ }
// refreshPipeline flag check has been removed as part of
// https://issues.apache.org/jira/browse/HDDS-3658.
// Please refer this jira for more details.
@@ -2172,11 +2189,12 @@
OzoneFileStatus fileStatus;
if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
fileStatus = getOzoneFileStatusFSO(volumeName, bucketName, keyName,
- args.getSortDatanodes(), clientAddress, false);
+ args.getSortDatanodes(), clientAddress,
+ args.getLatestVersionLocation(),false);
} else {
fileStatus = getOzoneFileStatus(volumeName, bucketName,
keyName, args.getRefreshPipeline(), args.getSortDatanodes(),
- clientAddress);
+ args.getLatestVersionLocation(), clientAddress);
}
//if key is not of type file or if key is not found we throw an exception
if (fileStatus.isFile()) {
@@ -2409,6 +2427,9 @@
for (OzoneFileStatus fileStatus : fileStatusList) {
keyInfoList.add(fileStatus.getKeyInfo());
}
+ if (args.getLatestVersionLocation()) {
+ slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
+ }
refreshPipeline(keyInfoList);
if (args.getSortDatanodes()) {
@@ -2534,7 +2555,8 @@
}
OzoneFileStatus fileStatusInfo = getOzoneFileStatusFSO(volumeName,
- bucketName, startKey, false, null, true);
+ bucketName, startKey, false, null,
+ args.getLatestVersionLocation(),true);
if (fileStatusInfo != null) {
prefixKeyInDB = fileStatusInfo.getKeyInfo().getParentObjectID();
@@ -2587,7 +2609,9 @@
for (OzoneFileStatus fileStatus : cacheDirMap.values()) {
fileStatusFinalList.add(fileStatus);
}
-
+ if (args.getLatestVersionLocation()) {
+ slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
+ }
// refreshPipeline flag check has been removed as part of
// https://issues.apache.org/jira/browse/HDDS-3658.
// Please refer this jira for more details.
@@ -2953,6 +2977,23 @@
}
return nodeSet;
}
+ private void slimLocationVersion(OmKeyInfo... keyInfos) {
+ if (keyInfos != null) {
+ for (OmKeyInfo keyInfo : keyInfos) {
+ OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations();
+ if (key == null) {
+ LOG.warn("No location version for key {}", keyInfo);
+ continue;
+ }
+ int keyLocationVersionLength = keyInfo.getKeyLocationVersions().size();
+ if (keyLocationVersionLength <= 1) {
+ continue;
+ }
+ keyInfo.setKeyLocationVersions(keyInfo.getKeyLocationVersions()
+ .subList(keyLocationVersionLength - 1, keyLocationVersionLength));
+ }
+ }
+ }
@Override
public OmKeyInfo getPendingDeletionDir() throws IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index c190768..4cbbd7f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -362,6 +362,7 @@
.setKeyName(keyArgs.getKeyName())
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(keyArgs.getSortDatanodes())
+ .setLatestVersionLocation(keyArgs.getLatestVersionLocation())
.build();
OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf(false, clientVersion));
@@ -566,6 +567,7 @@
.setKeyName(keyArgs.getKeyName())
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(keyArgs.getSortDatanodes())
+ .setLatestVersionLocation(keyArgs.getLatestVersionLocation())
.build();
return LookupFileResponse.newBuilder()
.setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf(clientVersion))
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
index 472efb4..be0b2ce 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.fsck;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -68,6 +69,11 @@
return replicaDelta == 0 && !isMisReplicated();
}
+ public boolean isDeleted() {
+ return container.getState() == HddsProtos.LifeCycleState.DELETED ||
+ container.getState() == HddsProtos.LifeCycleState.DELETING;
+ }
+
public boolean isOverReplicated() {
return replicaDelta < 0;
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 04afed2..5f4df00 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -18,18 +18,24 @@
package org.apache.hadoop.ozone.recon.fsck;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import org.apache.hadoop.util.Time;
import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
@@ -49,6 +55,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(ContainerHealthTask.class);
+ private StorageContainerServiceProvider scmClient;
private ContainerManagerV2 containerManager;
private ContainerHealthSchemaManager containerHealthSchemaManager;
private PlacementPolicy placementPolicy;
@@ -57,11 +64,13 @@
public ContainerHealthTask(
ContainerManagerV2 containerManager,
+ StorageContainerServiceProvider scmClient,
ReconTaskStatusDao reconTaskStatusDao,
ContainerHealthSchemaManager containerHealthSchemaManager,
PlacementPolicy placementPolicy,
ReconTaskConfig reconTaskConfig) {
super(reconTaskStatusDao);
+ this.scmClient = scmClient;
this.containerHealthSchemaManager = containerHealthSchemaManager;
this.placementPolicy = placementPolicy;
this.containerManager = containerManager;
@@ -147,6 +156,11 @@
}
if (ContainerHealthRecords
.retainOrUpdateRecord(currentContainer, rec)) {
+ // Check if the missing container is deleted in SCM
+ if (currentContainer.isMissing() &&
+ containerDeletedInSCM(currentContainer.getContainer())) {
+ rec.delete();
+ }
existingRecords.add(rec.getContainerState());
if (rec.changed()) {
rec.update();
@@ -174,7 +188,11 @@
containerManager.getContainerReplicas(container.containerID());
ContainerHealthStatus h = new ContainerHealthStatus(
container, containerReplicas, placementPolicy);
- if (h.isHealthy()) {
+ if (h.isHealthy() || h.isDeleted()) {
+ return;
+ }
+ // For containers deleted in SCM, we sync the container state here.
+ if (h.isMissing() && containerDeletedInSCM(container)) {
return;
}
containerHealthSchemaManager.insertUnhealthyContainerRecords(
@@ -185,6 +203,35 @@
}
}
+ private boolean containerDeletedInSCM(ContainerInfo containerInfo) {
+ try {
+ ContainerWithPipeline containerWithPipeline =
+ scmClient.getContainerWithPipeline(containerInfo.getContainerID());
+ if (containerWithPipeline.getContainerInfo().getState() ==
+ HddsProtos.LifeCycleState.DELETED) {
+ if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
+ containerManager.updateContainerState(containerInfo.containerID(),
+ HddsProtos.LifeCycleEvent.DELETE);
+ }
+ if (containerInfo.getState() == HddsProtos.LifeCycleState.DELETING &&
+ containerManager.getContainerReplicas(containerInfo.containerID())
+ .size() == 0
+ ) {
+ containerManager.updateContainerState(containerInfo.containerID(),
+ HddsProtos.LifeCycleEvent.CLEANUP);
+ }
+ return true;
+ }
+ } catch (InvalidStateTransitionException e) {
+ LOG.error("Failed to transition Container state while processing " +
+ "container in Container Health task", e);
+ } catch (IOException e) {
+ LOG.error("Got exception while processing container in" +
+ " Container Health task", e);
+ }
+ return false;
+ }
+
/**
* Helper methods to generate and update the required database records for
* unhealthy containers.
@@ -245,7 +292,7 @@
ContainerHealthStatus container, Set<String> recordForStateExists,
long time) {
List<UnhealthyContainers> records = new ArrayList<>();
- if (container.isHealthy()) {
+ if (container.isHealthy() || container.isDeleted()) {
return records;
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index c7f4e22..ebef4d8 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -195,6 +195,7 @@
reconTaskConfig));
reconScmTasks.add(new ContainerHealthTask(
containerManager,
+ scmServiceProvider,
reconTaskStatusDao, containerHealthSchemaManager,
containerPlacementPolicy,
reconTaskConfig));
@@ -334,4 +335,8 @@
public EventQueue getEventQueue() {
return eventQueue;
}
+
+ public StorageContainerServiceProvider getScmServiceProvider() {
+ return scmServiceProvider;
+ }
}
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index 18f9fa4..18125ea 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -42,9 +42,11 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import org.apache.ozone.test.LambdaTestUtils;
import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
@@ -73,6 +75,8 @@
mock(ReconStorageContainerManagerFacade.class);
MockPlacementPolicy placementMock = new MockPlacementPolicy();
ContainerManagerV2 containerManagerMock = mock(ContainerManagerV2.class);
+ StorageContainerServiceProvider scmClientMock =
+ mock(StorageContainerServiceProvider.class);
ContainerReplica unhealthyReplicaMock = mock(ContainerReplica.class);
when(unhealthyReplicaMock.getState()).thenReturn(State.UNHEALTHY);
ContainerReplica healthyReplicaMock = mock(ContainerReplica.class);
@@ -81,10 +85,13 @@
// Create 6 containers. The first 5 will have various unhealthy states
// defined below. The container with ID=6 will be healthy.
List<ContainerInfo> mockContainers = getMockContainers(6);
+ when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
when(containerManagerMock.getContainers()).thenReturn(mockContainers);
for (ContainerInfo c : mockContainers) {
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
+ when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
+ .thenReturn(new ContainerWithPipeline(c, null));
}
// Under replicated
when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(1L)))
@@ -125,6 +132,7 @@
reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
ContainerHealthTask containerHealthTask =
new ContainerHealthTask(scmMock.getContainerManager(),
+ scmMock.getScmServiceProvider(),
reconTaskStatusDao, containerHealthSchemaManager,
placementMock, reconTaskConfig);
containerHealthTask.start();
@@ -206,6 +214,76 @@
unHealthyContainersTableHandle.fetchByContainerId(5L).size());
}
+ @Test
+ public void testDeletedContainer() throws Exception {
+ UnhealthyContainersDao unHealthyContainersTableHandle =
+ getDao(UnhealthyContainersDao.class);
+
+ ContainerHealthSchemaManager containerHealthSchemaManager =
+ new ContainerHealthSchemaManager(
+ getSchemaDefinition(ContainerSchemaDefinition.class),
+ unHealthyContainersTableHandle);
+ ReconStorageContainerManagerFacade scmMock =
+ mock(ReconStorageContainerManagerFacade.class);
+ MockPlacementPolicy placementMock = new MockPlacementPolicy();
+ ContainerManagerV2 containerManagerMock = mock(ContainerManagerV2.class);
+ StorageContainerServiceProvider scmClientMock =
+ mock(StorageContainerServiceProvider.class);
+
+ // Create 2 containers. The first is OPEN will no replicas, the second is
+ // CLOSED with no replicas.
+ List<ContainerInfo> mockContainers = getMockContainers(2);
+ when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
+ when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
+ when(containerManagerMock.getContainers()).thenReturn(mockContainers);
+ for (ContainerInfo c : mockContainers) {
+ when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
+ when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
+ .thenReturn(new ContainerWithPipeline(c, null));
+ }
+ // Container State OPEN with no replicas
+ when(containerManagerMock.getContainer(ContainerID.valueOf(1L)).getState())
+ .thenReturn(HddsProtos.LifeCycleState.OPEN);
+ when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(1L)))
+ .thenReturn(Collections.emptySet());
+ when(scmClientMock.getContainerWithPipeline(1))
+ .thenReturn(new ContainerWithPipeline(mockContainers.get(0), null));
+
+ // Container State CLOSED with no replicas
+ when(containerManagerMock.getContainer(ContainerID.valueOf(2L)).getState())
+ .thenReturn(HddsProtos.LifeCycleState.CLOSED);
+ when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(2L)))
+ .thenReturn(Collections.emptySet());
+ ContainerInfo mockDeletedContainer = getMockDeletedContainer(2);
+ when(scmClientMock.getContainerWithPipeline(2))
+ .thenReturn(new ContainerWithPipeline(mockDeletedContainer, null));
+
+ List<UnhealthyContainers> all = unHealthyContainersTableHandle.findAll();
+ Assert.assertTrue(all.isEmpty());
+
+ long currentTime = System.currentTimeMillis();
+ ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
+ ReconTaskConfig reconTaskConfig = new ReconTaskConfig();
+ reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
+ ContainerHealthTask containerHealthTask =
+ new ContainerHealthTask(scmMock.getContainerManager(),
+ scmMock.getScmServiceProvider(),
+ reconTaskStatusDao, containerHealthSchemaManager,
+ placementMock, reconTaskConfig);
+ containerHealthTask.start();
+ LambdaTestUtils.await(6000, 1000, () ->
+ (unHealthyContainersTableHandle.count() == 1));
+ UnhealthyContainers rec =
+ unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
+ assertEquals("MISSING", rec.getContainerState());
+ assertEquals(3, rec.getReplicaDelta().intValue());
+
+ ReconTaskStatus taskStatus =
+ reconTaskStatusDao.findById(containerHealthTask.getTaskName());
+ Assert.assertTrue(taskStatus.getLastUpdatedTimestamp() >
+ currentTime);
+ }
+
private Set<ContainerReplica> getMockReplicas(
long containerId, State...states) {
Set<ContainerReplica> replicas = new HashSet<>();
@@ -233,6 +311,16 @@
return containers;
}
+ private ContainerInfo getMockDeletedContainer(int containerID) {
+ ContainerInfo c = mock(ContainerInfo.class);
+ when(c.getContainerID()).thenReturn((long)containerID);
+ when(c.getReplicationFactor())
+ .thenReturn(HddsProtos.ReplicationFactor.THREE);
+ when(c.containerID()).thenReturn(ContainerID.valueOf(containerID));
+ when(c.getState()).thenReturn(HddsProtos.LifeCycleState.DELETED);
+ return c;
+ }
+
/**
* This is a simple implementation of PlacementPolicy, so that when
* validateContainerPlacement() is called, by default it will return a value