SOLR-16699: Add Collection creation time to CLUSTERSTATUS and COLSTATUS API responses (#2226)

Using ZooKeeper "ctime" of the node.

Co-authored-by: Julien Pilourdault
Co-authored-by: Paul McArthur <pmcarthur-apache@proton.me>
Co-authored-by: David Smiley <dsmiley@salesforce.com>
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8e6e5ea..45dd5aa 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -87,6 +87,9 @@
 * SOLR-17119: When registering or updating a ConfigurablePlugin through the `/cluster/plugin` API,
   config validation exceptions are now propagated to the callers. (Yohann Callea)
 
+* SOLR-16699: Add Collection creation time to CLUSTERSTATUS and COLSTATUS API responses
+  (Julien Pilourdault, Paul McArthur, David Smiley)
+
 Optimizations
 ---------------------
 (No changes)
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index 8b789ea..658cd8f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -31,6 +31,7 @@
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumMap;
@@ -636,7 +637,8 @@
               data,
               Collections.emptySet(),
               updater.getCollectionName(),
-              zkStateReader.getZkClient());
+              zkStateReader.getZkClient(),
+              Instant.ofEpochMilli(stat.getCtime()));
 
       return clusterState;
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index 11b3795..a5edff6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -19,6 +19,7 @@
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -133,9 +134,18 @@
     }
 
     assert !collectionProps.containsKey(CollectionAdminParams.COLL_CONF);
+
+    // This instance does not fully capture what will be persisted: the zkNodeVersion and
+    // creationTime will only be definitively set in ZK. Hence, the defaults passed here.
     DocCollection newCollection =
         DocCollection.create(
-            cName, slices, collectionProps, router, -1, stateManager.getPrsSupplier(cName));
+            cName,
+            slices,
+            collectionProps,
+            router,
+            -1,
+            Instant.EPOCH,
+            stateManager.getPrsSupplier(cName));
 
     return new ZkWriteCommand(cName, newCollection);
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 088c4f8..879a440 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -176,6 +176,7 @@
             props,
             coll.getRouter(),
             coll.getZNodeVersion(),
+            coll.getCreationTime(),
             stateManager.getPrsSupplier(coll.getName()));
     if (replicaOps == null) {
       return new ZkWriteCommand(coll.getName(), collection);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index f61b9a2..f6f1068 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -20,6 +20,7 @@
 
 import com.codahale.metrics.Timer;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -303,11 +304,13 @@
                       c.getProperties(),
                       c.getRouter(),
                       stat.getVersion(),
+                      Instant.ofEpochMilli(stat.getCtime()),
                       PerReplicaStatesOps.getZkClientPrsSupplier(reader.getZkClient(), path));
               clusterState = clusterState.copyWith(name, newCollection);
             } else {
               log.debug("going to create_collection {}", path);
-              reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
+              Stat stat = new Stat();
+              reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true, stat);
               DocCollection newCollection =
                   DocCollection.create(
                       name,
@@ -315,6 +318,7 @@
                       c.getProperties(),
                       c.getRouter(),
                       0,
+                      Instant.ofEpochMilli(stat.getCtime()),
                       PerReplicaStatesOps.getZkClientPrsSupplier(reader.getZkClient(), path));
               clusterState = clusterState.copyWith(name, newCollection);
             }
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index 8ff78b2..e846da5 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -24,6 +24,7 @@
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -221,7 +222,10 @@
         repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
       byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
       is.readBytes(arr, 0, (int) is.length());
-      ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet(), null);
+      // set a default created date, we don't aim at reading actual zookeeper state. The restored
+      // collection will have a new creation date when persisted in zookeeper.
+      ClusterState c_state =
+          ClusterState.createFromJson(-1, arr, Collections.emptySet(), Instant.EPOCH, null);
       return c_state.getCollection(collectionName);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index 9130fcc..f897aea 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -183,6 +183,8 @@
       collectionStatus = getCollectionStatus(docCollection, name, requestedShards);
 
       collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
+      collectionStatus.put(
+          "creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());
 
       if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
         collectionStatus.put("aliases", collectionVsAliases.get(name));
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 774737e..6befd13 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -103,6 +103,7 @@
       }
       SimpleOrderedMap<Object> colMap = new SimpleOrderedMap<>();
       colMap.add("znodeVersion", coll.getZNodeVersion());
+      colMap.add("creationTimeMillis", coll.getCreationTime().toEpochMilli());
       Map<String, Object> props = new TreeMap<>(coll.getProperties());
       props.remove("shards");
       colMap.add("properties", props);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
index ac9b5ff..068a8f3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -60,17 +61,21 @@
     slices.put("shard2", slice2);
     collectionStates.put(
         "collection1",
-        DocCollection.create("collection1", slices, props, DocRouter.DEFAULT, 0, null));
+        DocCollection.create(
+            "collection1", slices, props, DocRouter.DEFAULT, 0, Instant.EPOCH, null));
     collectionStates.put(
         "collection2",
-        DocCollection.create("collection2", slices, props, DocRouter.DEFAULT, 0, null));
+        DocCollection.create(
+            "collection2", slices, props, DocRouter.DEFAULT, 0, Instant.EPOCH, null));
 
     ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     assertFalse(clusterState.getCollection("collection1").getProperties().containsKey("shards"));
 
     byte[] bytes = Utils.toJSON(clusterState);
 
-    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
+    Instant creationTime = Instant.now();
+    ClusterState loadedClusterState =
+        ClusterState.createFromJson(-1, bytes, liveNodes, creationTime, null);
     assertFalse(
         loadedClusterState.getCollection("collection1").getProperties().containsKey("shards"));
 
@@ -96,13 +101,18 @@
             .get("node1")
             .getStr("prop2"));
 
-    loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes, null);
+    assertEquals(creationTime, loadedClusterState.getCollection("collection1").getCreationTime());
+    assertEquals(creationTime, loadedClusterState.getCollection("collection2").getCreationTime());
+
+    loadedClusterState =
+        ClusterState.createFromJson(-1, new byte[0], liveNodes, Instant.now(), null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
     assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size());
 
-    loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes, null);
+    loadedClusterState =
+        ClusterState.createFromJson(-1, (byte[]) null, liveNodes, Instant.now(), null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 9eabee8..6bf199b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -27,6 +27,7 @@
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -1209,4 +1210,34 @@
                 .unsetAttribute("non_existent_attr")
                 .process(cluster.getSolrClient()));
   }
+
+  @Test
+  public void testCollectionCreationTime() throws SolrServerException, IOException {
+    Instant beforeCreation = Instant.now();
+
+    String collectionName = getSaferTestName();
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
+
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+
+    Instant afterCreation = Instant.now();
+
+    CollectionAdminRequest.ColStatus req = CollectionAdminRequest.collectionStatus(collectionName);
+    CollectionAdminResponse response = req.process(cluster.getSolrClient());
+    assertEquals(0, response.getStatus());
+
+    NamedList<?> colStatus = (NamedList<?>) response.getResponse().get(collectionName);
+    Long creationTimeMillis = (Long) colStatus._get("creationTimeMillis", null);
+    assertNotNull("creationTimeMillis was not included in COLSTATUS response", creationTimeMillis);
+
+    Instant creationTime = Instant.ofEpochMilli(creationTimeMillis);
+    assertTrue(
+        "COLSTATUS creationTimeMillis should be after the test started",
+        creationTime.isAfter(beforeCreation));
+    assertTrue(
+        "COLSTATUS creationTimeMillis should not be after the collection creation was completed",
+        creationTime.isBefore(afterCreation));
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 5e179e0..b38ad73 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -30,6 +30,7 @@
 import static org.mockito.Mockito.when;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -688,6 +689,7 @@
                       props.getProperties(),
                       DocRouter.DEFAULT,
                       0,
+                      Instant.EPOCH,
                       distribStateManagerMock.getPrsSupplier(collName))));
       }
       if (CollectionParams.CollectionAction.ADDREPLICA.isEqual(props.getStr("operation"))) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
index 4517cc1..64ec9f3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -59,7 +60,8 @@
 
     ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
-    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
+    ClusterState loadedClusterState =
+        ClusterState.createFromJson(-1, bytes, liveNodes, Instant.now(), null);
 
     assertSame(
         "Default state not set to active",
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
index 9d8c18f..64fcb5e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud.api.collections;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -123,7 +124,11 @@
 
     DocCollection c =
         ClusterState.createFromCollectionMap(
-                0, (Map<String, Object>) Utils.fromJSON(node.data), Collections.emptySet(), null)
+                0,
+                (Map<String, Object>) Utils.fromJSON(node.data),
+                Collections.emptySet(),
+                Instant.EPOCH,
+                null)
             .getCollection(collectionName);
 
     Set<String> knownKeys =
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index ddbe2e7..2591319 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud.api.collections;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -491,7 +492,10 @@
       Map<String, Object> collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
       assertNotNull(collection);
       assertEquals("conf1", collection.get("configName"));
-      //      assertEquals("1", collection.get("nrtReplicas"));
+
+      Instant creationTime = Instant.ofEpochMilli((long) collection.get("creationTimeMillis"));
+      assertEquals(
+          creationTime, client.getClusterState().getCollection(COLLECTION_NAME).getCreationTime());
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 3e1ca33..43c52d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +57,7 @@
 import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -146,6 +148,7 @@
                 Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                 DocRouter.DEFAULT,
                 0,
+                Instant.now(),
                 PerReplicaStatesOps.getZkClientPrsSupplier(
                     fixture.zkClient, DocCollection.getCollectionPath("c1"))));
 
@@ -173,6 +176,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -192,6 +196,7 @@
             props,
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     wc = new ZkWriteCommand("c1", state);
@@ -233,6 +238,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -246,6 +252,10 @@
     ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
     assertNotNull(ref);
     assertFalse(ref.isLazilyLoaded());
+
+    Stat stat = new Stat();
+    fixture.zkClient.getData(ZkStateReader.getCollectionPath("c1"), null, stat, false);
+    assertEquals(Instant.ofEpochMilli(stat.getCtime()), ref.get().getCreationTime());
   }
 
   /**
@@ -271,6 +281,7 @@
                 "true"),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -391,6 +402,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -413,6 +425,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             ref.get().getZNodeVersion(),
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     wc = new ZkWriteCommand("c1", state);
@@ -436,6 +449,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c2")));
     ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
@@ -470,12 +484,14 @@
 
     // create new collection
     DocCollection state =
-        new DocCollection(
+        DocCollection.create(
             "c1",
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            Instant.now(),
+            null);
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -490,12 +506,14 @@
 
     // update the collection
     state =
-        new DocCollection(
+        DocCollection.create(
             "c1",
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            ref.get().getZNodeVersion());
+            ref.get().getZNodeVersion(),
+            Instant.now(),
+            null);
     wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -511,12 +529,14 @@
 
     fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
     state =
-        new DocCollection(
+        DocCollection.create(
             "c2",
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            Instant.now(),
+            null);
     ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
 
     writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
@@ -552,6 +572,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
@@ -562,6 +583,7 @@
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
 
@@ -617,6 +639,7 @@
                             ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                         DocRouter.DEFAULT,
                         currentVersion,
+                        Instant.now(),
                         PerReplicaStatesOps.getZkClientPrsSupplier(
                             fixture.zkClient, DocCollection.getCollectionPath("c1")));
                 ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -693,6 +716,7 @@
             Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
     ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index 1a2c940..7d05566 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -18,6 +18,7 @@
 
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -43,6 +44,7 @@
 import org.apache.solr.common.util.ZLibCompressor;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -95,15 +97,9 @@
         Map<String, Object> props =
             Collections.singletonMap(
                 ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1", new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0));
-        ZkWriteCommand c2 =
-            new ZkWriteCommand(
-                "c2", new DocCollection("c2", new HashMap<>(), props, DocRouter.DEFAULT, 0));
-        ZkWriteCommand c3 =
-            new ZkWriteCommand(
-                "c3", new DocCollection("c3", new HashMap<>(), props, DocRouter.DEFAULT, 0));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", props));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", createDocCollection("c2", props));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", createDocCollection("c3", props));
         ZkStateWriter writer =
             new ZkStateWriter(reader, new Stats(), -1, STATE_COMPRESSION_PROVIDER);
 
@@ -161,18 +157,9 @@
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
 
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1",
-                new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
-        ZkWriteCommand c2 =
-            new ZkWriteCommand(
-                "c2",
-                new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
-        ZkWriteCommand c3 =
-            new ZkWriteCommand(
-                "c3",
-                new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", new HashMap<>()));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", createDocCollection("c2", new HashMap<>()));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", createDocCollection("c3", new HashMap<>()));
         Map<String, Object> prsProps = new HashMap<>();
         prsProps.put("perReplicaState", Boolean.TRUE);
         ZkWriteCommand prs1 =
@@ -184,6 +171,7 @@
                     prsProps,
                     DocRouter.DEFAULT,
                     0,
+                    Instant.now(),
                     PerReplicaStatesOps.getZkClientPrsSupplier(
                         zkClient, DocCollection.getCollectionPath("c1"))));
         ZkStateWriter writer =
@@ -244,21 +232,9 @@
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
 
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1",
-                DocCollection.create(
-                    "c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, null));
-        ZkWriteCommand c2 =
-            new ZkWriteCommand(
-                "c2",
-                DocCollection.create(
-                    "c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, null));
-        ZkWriteCommand c3 =
-            new ZkWriteCommand(
-                "c3",
-                DocCollection.create(
-                    "c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, null));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", new HashMap<>()));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", createDocCollection("c2", new HashMap<>()));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", createDocCollection("c3", new HashMap<>()));
         Map<String, Object> prsProps = new HashMap<>();
         prsProps.put("perReplicaState", Boolean.TRUE);
         ZkWriteCommand prs1 =
@@ -270,6 +246,7 @@
                     prsProps,
                     DocRouter.DEFAULT,
                     0,
+                    Instant.now(),
                     PerReplicaStatesOps.getZkClientPrsSupplier(
                         zkClient, DocCollection.getCollectionPath("prs1"))));
         ZkStateWriter writer =
@@ -333,16 +310,13 @@
         ZkWriteCommand c1 =
             new ZkWriteCommand(
                 "c1",
-                new DocCollection(
+                createDocCollection(
                     "c1",
-                    new HashMap<String, Slice>(),
                     Collections.singletonMap(
-                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                    DocRouter.DEFAULT,
-                    0));
+                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME)));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates();
+        ClusterState clusterState = writer.writePendingUpdates();
 
         Map<?, ?> map =
             (Map<?, ?>)
@@ -350,6 +324,12 @@
                     zkClient.getData(
                         ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
         assertNotNull(map.get("c1"));
+
+        Stat stat = new Stat();
+        zkClient.getData(ZkStateReader.getCollectionPath("c1"), null, stat, false);
+        assertEquals(
+            Instant.ofEpochMilli(stat.getCtime()),
+            clusterState.getCollection("c1").getCreationTime());
       }
     } finally {
       IOUtils.close(zkClient);
@@ -389,13 +369,10 @@
         ZkWriteCommand c2 =
             new ZkWriteCommand(
                 "c2",
-                new DocCollection(
+                createDocCollection(
                     "c2",
-                    new HashMap<String, Slice>(),
                     Collections.singletonMap(
-                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                    DocRouter.DEFAULT,
-                    0));
+                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME)));
         state = writer.enqueueUpdate(state, Collections.singletonList(c2), null);
         assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
 
@@ -408,6 +385,12 @@
         // get the most up-to-date state
         reader.forceUpdateCollection("c2");
         state = reader.getClusterState();
+
+        Stat stat = new Stat();
+        zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, stat, false);
+        assertEquals(
+            Instant.ofEpochMilli(stat.getCtime()), state.getCollection("c2").getCreationTime());
+
         log.info("Cluster state: {}", state);
         assertTrue(state.hasCollection("c2"));
         assertEquals(c2Version + 1, state.getCollection("c2").getZNodeVersion());
@@ -424,13 +407,10 @@
         ZkWriteCommand c1 =
             new ZkWriteCommand(
                 "c1",
-                new DocCollection(
+                createDocCollection(
                     "c1",
-                    new HashMap<String, Slice>(),
                     Collections.singletonMap(
-                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                    DocRouter.DEFAULT,
-                    0));
+                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME)));
 
         try {
           writer.enqueueUpdate(state, Collections.singletonList(c1), null);
@@ -486,15 +466,7 @@
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
         // create new collection with stateFormat = 2
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1",
-                new DocCollection(
-                    "c1",
-                    new HashMap<String, Slice>(),
-                    new HashMap<String, Object>(),
-                    DocRouter.DEFAULT,
-                    0));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", new HashMap<>()));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
         writer.writePendingUpdates();
@@ -530,7 +502,9 @@
         }
         ZkWriteCommand c1 =
             new ZkWriteCommand(
-                "c2", new DocCollection("c2", slices, new HashMap<>(), DocRouter.DEFAULT, 0));
+                "c2",
+                DocCollection.create(
+                    "c2", slices, new HashMap<>(), DocRouter.DEFAULT, 0, Instant.now(), null));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
         writer.writePendingUpdates();
@@ -549,4 +523,9 @@
       server.shutdown();
     }
   }
+
+  private DocCollection createDocCollection(String name, Map<String, Object> props) {
+    return DocCollection.create(
+        name, new HashMap<>(), props, DocRouter.DEFAULT, 0, Instant.now(), null);
+  }
 }
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc
index d1abf46..695d196 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc
@@ -1253,6 +1253,7 @@
     },
     "gettingstarted": {
         "znodeVersion": 16,
+        "creationTimeMillis": 1706228861003,
         "properties": {
             "nrtReplicas": "2",
             "pullReplicas": "0",
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index 36c5891..f9d202f 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -93,12 +94,18 @@
    * @param liveNodes list of live nodes
    * @param coll collection name
    * @param zkClient ZK client
+   * @param createTime creation time of the data/bytes
    * @return the ClusterState
    */
   @SuppressWarnings({"unchecked"})
   @Deprecated
   public static ClusterState createFromJsonSupportingLegacyConfigName(
-      int version, byte[] bytes, Set<String> liveNodes, String coll, SolrZkClient zkClient) {
+      int version,
+      byte[] bytes,
+      Set<String> liveNodes,
+      String coll,
+      SolrZkClient zkClient,
+      Instant createTime) {
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(liveNodes, Collections.emptyMap());
     }
@@ -129,6 +136,7 @@
         version,
         stateMap,
         liveNodes,
+        createTime,
         PerReplicaStatesOps.getZkClientPrsSupplier(
             zkClient, DocCollection.getCollectionPath(coll)));
   }
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 4ae9d16..25b16e1 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -544,6 +544,27 @@
   }
 
   /**
+   * Returns path of created node
+   *
+   * @param stat Output argument that captures created node details
+   */
+  public String create(
+      final String path,
+      final byte[] data,
+      final CreateMode createMode,
+      boolean retryOnConnLoss,
+      Stat stat)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(
+          () -> keeper.create(path, data, zkACLProvider.getACLsToAdd(path), createMode, stat));
+    } else {
+      List<ACL> acls = zkACLProvider.getACLsToAdd(path);
+      return keeper.create(path, data, acls, createMode, stat);
+    }
+  }
+
+  /**
    * Creates the path in ZooKeeper, creating each node as necessary.
    *
    * <p>e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr, group, node exist,
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 715d985..9271fc4 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -20,6 +20,7 @@
 import static java.util.Collections.emptySortedSet;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1616,7 +1617,12 @@
         // TODO in Solr 10 remove that factory method
         ClusterState state =
             ZkClientClusterStateProvider.createFromJsonSupportingLegacyConfigName(
-                stat.getVersion(), data, Collections.emptySet(), coll, zkClient);
+                stat.getVersion(),
+                data,
+                Collections.emptySet(),
+                coll,
+                zkClient,
+                Instant.ofEpochMilli(stat.getCtime()));
 
         ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
         return collectionRef == null ? null : collectionRef.get();
diff --git a/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java b/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java
index 9d1eacd..6e59fe3 100644
--- a/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java
+++ b/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java
@@ -35,12 +35,14 @@
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.util.ExternalPaths;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -313,4 +315,34 @@
       return List.of(new ZkCredential("someuser", "somepass", ZkCredential.Perms.READ));
     }
   }
+
+  @Test
+  public void testCreateWithStat() throws InterruptedException, KeeperException {
+    String path = "/collections/" + "collectionName_" + getSaferTestName();
+    try {
+      Stat createStat = new Stat();
+      defaultClient.create(
+          path, "hello".getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, false, createStat);
+      Stat readStat = new Stat();
+      defaultClient.getData(path, null, readStat, false);
+      assertEquals(createStat, readStat);
+    } finally {
+      defaultClient.delete(path, 0, false);
+    }
+  }
+
+  @Test
+  public void testCreateWithStatAndRetry() throws InterruptedException, KeeperException {
+    String path = "/collections/" + "collectionName_" + getSaferTestName();
+    try {
+      Stat createStat = new Stat();
+      defaultClient.create(
+          path, "hello".getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true, createStat);
+      Stat readStat = new Stat();
+      defaultClient.getData(path, null, readStat, false);
+      assertEquals(createStat, readStat);
+    } finally {
+      defaultClient.delete(path, 0, false);
+    }
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index 74a225d..d5e2d18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -155,7 +156,12 @@
     for (Map.Entry<String, Object> e : collectionsMap.entrySet()) {
       @SuppressWarnings("rawtypes")
       Map m = (Map) e.getValue();
-      cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, m));
+      Long creationTimeMillisFromClusterStatus = (Long) m.get("creationTimeMillis");
+      Instant creationTime =
+          creationTimeMillisFromClusterStatus == null
+              ? Instant.EPOCH
+              : Instant.ofEpochMilli(creationTimeMillisFromClusterStatus);
+      cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, creationTime, m));
     }
 
     if (clusterProperties != null) {
@@ -168,7 +174,8 @@
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private DocCollection fillPrs(int znodeVersion, Map.Entry<String, Object> e, Map m) {
+  private DocCollection fillPrs(
+      int znodeVersion, Map.Entry<String, Object> e, Instant creationTime, Map m) {
     DocCollection.PrsSupplier prsSupplier = null;
     if (m.containsKey("PRS")) {
       Map prs = (Map) m.remove("PRS");
@@ -180,7 +187,8 @@
                   (List<String>) prs.get("states"));
     }
 
-    return ClusterState.collectionFromObjects(e.getKey(), m, znodeVersion, prsSupplier);
+    return ClusterState.collectionFromObjects(
+        e.getKey(), m, znodeVersion, creationTime, prsSupplier);
   }
 
   @Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 4418d1e..83898f5 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -225,28 +226,35 @@
    *     Json representation of a {@link DocCollection} as written by {@link #write(JSONWriter)}. It
    *     can represent one or more collections.
    * @param liveNodes list of live nodes
+   * @param creationTime assigns this date to all {@link DocCollection} referenced by the returned
+   *     {@link ClusterState}
    * @return the ClusterState
    */
   public static ClusterState createFromJson(
-      int version, byte[] bytes, Set<String> liveNodes, DocCollection.PrsSupplier prsSupplier) {
+      int version,
+      byte[] bytes,
+      Set<String> liveNodes,
+      Instant creationTime,
+      DocCollection.PrsSupplier prsSupplier) {
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(liveNodes, Collections.<String, DocCollection>emptyMap());
     }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> stateMap =
         (Map<String, Object>) Utils.fromJSON(bytes, 0, bytes.length, STR_INTERNER_OBJ_BUILDER);
-    return createFromCollectionMap(version, stateMap, liveNodes, prsSupplier);
+    return createFromCollectionMap(version, stateMap, liveNodes, creationTime, prsSupplier);
   }
 
   @Deprecated
   public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
-    return createFromJson(version, bytes, liveNodes, null);
+    return createFromJson(version, bytes, liveNodes, Instant.EPOCH, null);
   }
 
   public static ClusterState createFromCollectionMap(
       int version,
       Map<String, Object> stateMap,
       Set<String> liveNodes,
+      Instant creationTime,
       DocCollection.PrsSupplier prsSupplier) {
     Map<String, CollectionRef> collections = CollectionUtil.newLinkedHashMap(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
@@ -254,7 +262,11 @@
       @SuppressWarnings({"unchecked"})
       DocCollection coll =
           collectionFromObjects(
-              collectionName, (Map<String, Object>) entry.getValue(), version, prsSupplier);
+              collectionName,
+              (Map<String, Object>) entry.getValue(),
+              version,
+              creationTime,
+              prsSupplier);
       collections.put(collectionName, new CollectionRef(coll));
     }
 
@@ -264,12 +276,16 @@
   @Deprecated
   public static ClusterState createFromCollectionMap(
       int version, Map<String, Object> stateMap, Set<String> liveNodes) {
-    return createFromCollectionMap(version, stateMap, liveNodes, null);
+    return createFromCollectionMap(version, stateMap, liveNodes, Instant.EPOCH, null);
   }
 
   // TODO move to static DocCollection.loadFromMap
   public static DocCollection collectionFromObjects(
-      String name, Map<String, Object> objs, int version, DocCollection.PrsSupplier prsSupplier) {
+      String name,
+      Map<String, Object> objs,
+      int version,
+      Instant creationTime,
+      DocCollection.PrsSupplier prsSupplier) {
     Map<String, Object> props;
     Map<String, Slice> slices;
 
@@ -304,7 +320,7 @@
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return DocCollection.create(name, slices, props, router, version, prsSupplier);
+    return DocCollection.create(name, slices, props, router, version, creationTime, prsSupplier);
   }
 
   @Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index dc59a56..750a630 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -60,21 +61,22 @@
   private final Integer replicationFactor;
   private final ReplicaCount numReplicas;
   private final Boolean readOnly;
+  private final Instant creationTime;
   private final Boolean perReplicaState;
   private final Map<String, Replica> replicaMap = new HashMap<>();
   private AtomicReference<PerReplicaStates> perReplicaStatesRef;
 
   /**
-   * @see DocCollection#create(String, Map, Map, DocRouter, int, PrsSupplier)
+   * @see DocCollection#create(String, Map, Map, DocRouter, int, Instant, PrsSupplier)
    */
   @Deprecated
   public DocCollection(
       String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, Integer.MAX_VALUE, null);
+    this(name, slices, props, router, Integer.MAX_VALUE, Instant.EPOCH, null);
   }
 
   /**
-   * @see DocCollection#create(String, Map, Map, DocRouter, int, PrsSupplier)
+   * @see DocCollection#create(String, Map, Map, DocRouter, int, Instant, PrsSupplier)
    */
   @Deprecated
   public DocCollection(
@@ -83,7 +85,7 @@
       Map<String, Object> props,
       DocRouter router,
       int zkVersion) {
-    this(name, slices, props, router, zkVersion, null);
+    this(name, slices, props, router, zkVersion, Instant.EPOCH, null);
   }
 
   /**
@@ -93,7 +95,8 @@
    * @param props The properties of the slice. This is used directly and a copy is not made.
    * @param zkVersion The version of the Collection node in Zookeeper (used for conditional
    *     updates).
-   * @see DocCollection#create(String, Map, Map, DocRouter, int, PrsSupplier)
+   * @param creationTime The creation time of the collection
+   * @see DocCollection#create(String, Map, Map, DocRouter, int, Instant, PrsSupplier)
    */
   private DocCollection(
       String name,
@@ -101,6 +104,7 @@
       Map<String, Object> props,
       DocRouter router,
       int zkVersion,
+      Instant creationTime,
       AtomicReference<PerReplicaStates> perReplicaStatesRef) {
     super(props);
     // -1 means any version in ZK CAS, so we choose Integer.MAX_VALUE instead to avoid accidental
@@ -129,6 +133,7 @@
     }
     Boolean readOnly = (Boolean) verifyProp(props, CollectionStateProps.READ_ONLY);
     this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
+    this.creationTime = creationTime;
 
     Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
 
@@ -160,6 +165,7 @@
    * @param router router to partition int range into n ranges
    * @param zkVersion The version of the Collection node in Zookeeper (used for conditional
    *     updates).
+   * @param creationTime The creation time of the collection
    * @param prsSupplier optional supplier for PerReplicaStates (PRS) for PRS enabled collections
    * @return a newly constructed DocCollection
    */
@@ -169,6 +175,7 @@
       Map<String, Object> props,
       DocRouter router,
       int zkVersion,
+      Instant creationTime,
       DocCollection.PrsSupplier prsSupplier) {
     boolean perReplicaState =
         (Boolean) verifyProp(props, CollectionStateProps.PER_REPLICA_STATE, Boolean.FALSE);
@@ -176,7 +183,7 @@
     if (perReplicaState) {
       if (prsSupplier == null) {
         throw new IllegalArgumentException(
-            CollectionStateProps.PER_REPLICA_STATE + " = true , but prsSuppler is not provided");
+            CollectionStateProps.PER_REPLICA_STATE + " = true , but prsSupplier is not provided");
       }
 
       if (!hasAnyReplica(
@@ -196,6 +203,7 @@
         props,
         router,
         zkVersion,
+        creationTime,
         perReplicaStates != null ? new AtomicReference<>(perReplicaStates) : null);
   }
 
@@ -289,7 +297,8 @@
    */
   public DocCollection copyWithSlices(Map<String, Slice> slices) {
     DocCollection result =
-        new DocCollection(getName(), slices, propMap, router, znodeVersion, perReplicaStatesRef);
+        new DocCollection(
+            getName(), slices, propMap, router, znodeVersion, creationTime, perReplicaStatesRef);
     return result;
   }
 
@@ -385,6 +394,14 @@
     return readOnly;
   }
 
+  /**
+   * The creation time of the Collection. When this collection is read from ZooKeeper, this is the
+   * creation time of the collection node.
+   */
+  public Instant getCreationTime() {
+    return creationTime;
+  }
+
   @Override
   public String toString() {
     return "DocCollection("
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 8c3f74b..9603dcc 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -24,6 +24,7 @@
 
 import java.net.ConnectException;
 import java.net.SocketException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -86,7 +87,8 @@
                 .build()) {
       livenodes.addAll(Set.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
       ClusterState cs =
-          ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet(), null);
+          ClusterState.createFromJson(
+              1, coll1State.getBytes(UTF_8), Collections.emptySet(), Instant.now(), null);
       refs.put(collName, new Ref(collName));
       colls.put(collName, cs.getCollectionOrNull(collName));
       responses.put(
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
new file mode 100644
index 0000000..c181a59
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.util.NamedList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ClusterStateProviderTest extends SolrCloudTestCase {
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)
+        .addConfig(
+            "conf",
+            getFile("solrj")
+                .toPath()
+                .resolve("solr")
+                .resolve("configsets")
+                .resolve("streaming")
+                .resolve("conf"))
+        .configure();
+  }
+
+  private ClusterStateProvider createClusterStateProvider() throws Exception {
+    return !usually() ? http2ClusterStateProvider() : zkClientClusterStateProvider();
+  }
+
+  private ClusterStateProvider http2ClusterStateProvider() throws Exception {
+    return new Http2ClusterStateProvider(
+        List.of(cluster.getJettySolrRunner(0).getBaseUrl().toString()), null);
+  }
+
+  private ClusterStateProvider zkClientClusterStateProvider() {
+    return new ZkClientClusterStateProvider(cluster.getZkStateReader());
+  }
+
+  @Test
+  public void testGetClusterState() throws Exception {
+
+    createCollection("testGetClusterState");
+    createCollection("testGetClusterState2");
+
+    try (ClusterStateProvider provider = createClusterStateProvider()) {
+
+      ClusterState clusterState = provider.getClusterState();
+
+      DocCollection docCollection = clusterState.getCollection("testGetClusterState");
+      assertEquals(
+          getCreationTimeFromClusterStatus("testGetClusterState"), docCollection.getCreationTime());
+
+      docCollection = clusterState.getCollection("testGetClusterState2");
+      assertEquals(
+          getCreationTimeFromClusterStatus("testGetClusterState2"),
+          docCollection.getCreationTime());
+    }
+  }
+
+  @Test
+  public void testGetState() throws Exception {
+
+    createCollection("testGetState");
+
+    try (ClusterStateProvider provider = createClusterStateProvider()) {
+
+      ClusterState.CollectionRef collectionRef = provider.getState("testGetState");
+
+      DocCollection docCollection = collectionRef.get();
+      assertNotNull(docCollection);
+      assertEquals(
+          getCreationTimeFromClusterStatus("testGetState"), docCollection.getCreationTime());
+    }
+  }
+
+  private void createCollection(String collectionName) throws SolrServerException, IOException {
+    CollectionAdminRequest.Create request =
+        CollectionAdminRequest.createCollection(collectionName, "conf", 1, 0, 1, 0);
+    request.process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Instant getCreationTimeFromClusterStatus(String collectionName)
+      throws SolrServerException, IOException {
+    CollectionAdminRequest.ClusterStatus request = CollectionAdminRequest.getClusterStatus();
+    request.setCollectionName(collectionName);
+    CollectionAdminResponse clusterStatusResponse = request.process(cluster.getSolrClient());
+    NamedList<Object> response = clusterStatusResponse.getResponse();
+
+    NamedList<Object> cluster = (NamedList<Object>) response.get("cluster");
+    NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
+    Map<String, Object> collection = (Map<String, Object>) collections.get(collectionName);
+    return Instant.ofEpochMilli((long) collection.get("creationTimeMillis"));
+  }
+}
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/ReplicaCountTest.java b/solr/solrj/src/test/org/apache/solr/common/cloud/ReplicaCountTest.java
index 76cbe38..0930e6d 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/ReplicaCountTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/ReplicaCountTest.java
@@ -20,6 +20,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.time.Instant;
 import java.util.Map;
 import java.util.Set;
 import org.junit.Test;
@@ -211,6 +212,7 @@
             Map.of("nrtReplicas", 1, "tlogReplicas", 2, "pullReplicas", 3),
             null,
             1,
+            Instant.EPOCH,
             null);
     ReplicaCount numReplicas =
         ReplicaCount.fromMessage(new ZkNodeProps(Map.of("tlogReplicas", 1)), collection);