IGNITE-15427 Improve the snapshot procedure logging (#9373)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index fb6086d..0a53077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -244,7 +244,7 @@
 
     /**
      * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
-     * It is important to have only only buffer per thread (instead of creating each buffer per
+     * It is important to have only one buffer per thread (instead of creating each buffer per
      * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer
      * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it.
      */
@@ -262,7 +262,7 @@
     /** Take snapshot operation procedure. */
     private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;
 
-    /** Check previously performed snapshot operation and delete uncompleted files if need. */
+    /** Check previously performed snapshot operation and delete uncompleted files if we need. */
     private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;
 
     /** Marshaller. */
@@ -566,7 +566,7 @@
     /**
      * @param snpName Snapshot name.
      * @return Local snapshot directory for snapshot with given name.
-     * @throws IgniteCheckedException If directory doesn't exists.
+     * @throws IgniteCheckedException If directory doesn't exist.
      */
     private File resolveSnapshotDir(String snpName) throws IgniteCheckedException {
         File snpDir = snapshotLocalDir(snpName);
@@ -1123,7 +1123,7 @@
 
     /**
      * @param snpName Snapshot name.
-     * @param folderName Directory name for cache group.
+     * @param folderName The name of a directory for the cache group.
      * @return The list of cache or cache group names in given snapshot on local node.
      */
     public List<File> snapshotCacheDirectories(String snpName, String folderName) {
@@ -1132,7 +1132,7 @@
 
     /**
      * @param snpName Snapshot name.
-     * @param folderName Directory name for cache group.
+     * @param folderName The name of a directory for the cache group.
      * @param names Cache group names to filter.
      * @return The list of cache or cache group names in given snapshot on local node.
      */
@@ -1147,7 +1147,7 @@
 
     /**
      * @param snpName Snapshot name.
-     * @param consId Node consistent id to read medata for.
+     * @param consId Node consistent id to read metadata for.
      * @return Snapshot metadata instance.
      */
     public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
@@ -1420,7 +1420,14 @@
 
             // Schedule task on a checkpoint and wait when it starts.
             try {
+                long start = U.currentTimeMillis();
+
                 task.started().get();
+
+                if (log.isInfoEnabled()) {
+                    log.info("Finished waiting for a synchronized checkpoint under topology lock " +
+                        "[snpName=" + task.snapshotName() + ", time=" + (U.currentTimeMillis() - start) + "ms]");
+                }
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Fail to wait while cluster-wide snapshot operation started", e);
@@ -1850,7 +1857,7 @@
         }
 
         /**
-         * Creates a result by invocating the handler.
+         * Creates a result by invocation the handler.
          *
          * @param hnd Snapshot operation handler.
          * @param ctx Snapshot operation handler context.
@@ -1913,7 +1920,7 @@
 
         /**
          * Current partition page index for read. Due to we read the partition twice it
-         * can't be greater that 2 * store.size().
+         * can't be greater than 2 * store.size().
          */
         private int currIdx;
 
@@ -1991,7 +1998,7 @@
                         }
 
                         // There is no difference between a page containing an incomplete DataRow fragment and
-                        // the page where DataRow takes up all the free space. There is no a dedicated
+                        // the page where DataRow takes up all the free space. There is no dedicated
                         // flag for this case in page header.
                         // During the storage scan we can skip such pages at the first iteration over the partition file,
                         // since all the fragmented pages will be marked by BitSet array we will safely read the others
@@ -2136,7 +2143,7 @@
 
                 metaStorage.write(SNP_RUNNING_KEY, snpName);
 
-                U.ensureDirectory(dbDir, "snapshot work directory", log);
+                U.ensureDirectory(dbDir, "snapshot work directory for a local snapshot sender", log);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -2263,7 +2270,7 @@
         @Override protected void close0(@Nullable Throwable th) {
             if (th == null) {
                 if (log.isInfoEnabled())
-                    log.info("Local snapshot sender closed, resources released [dbNodeSnpDir=" + dbDir + ']');
+                    log.info("The Local snapshot sender closed. All resources released [dbNodeSnpDir=" + dbDir + ']');
             }
             else {
                 deleteSnapshot(snpLocDir, pdsSettings.folderName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index e34aa17..1afecf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -394,7 +394,14 @@
 
         if (withMetaStorage) {
             try {
+                long start = U.currentTimeMillis();
+
                 U.get(((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage()).flush());
+
+                if (log.isInfoEnabled()) {
+                    log.info("Finished waiting for all the concurrent operations over the metadata store before snapshot " +
+                        "[snpName=" + snpName + ", time=" + (U.currentTimeMillis() - start) + "ms]");
+                }
             }
             catch (IgniteCheckedException ignore) {
                 // Flushing may be cancelled or interrupted due to the local node stopping.
@@ -452,16 +459,16 @@
                     if (!missed.isEmpty()) {
                         throw new IgniteCheckedException("Snapshot operation cancelled due to " +
                             "not all of requested partitions has OWNING state on local node [grpId=" + grpId +
-                            ", missed" + missed + ']');
+                            ", missed=" + S.compact(missed) + ']');
                     }
                 }
                 else {
-                    // Partitions has not been provided for snapshot task and all partitions have
+                    // Partitions have not been provided for snapshot task and all partitions have
                     // OWNING state, so index partition must be included into snapshot.
                     if (!missed.isEmpty()) {
                         log.warning("All local cache group partitions in OWNING state have been included into a snapshot. " +
                             "Partitions which have different states skipped. Index partitions has also been skipped " +
-                            "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']');
+                            "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + S.compact(missed) + ']');
                     }
                     else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
                         owning.add(INDEX_PARTITION);
@@ -518,8 +525,11 @@
         // Submit all tasks for partitions and deltas processing.
         List<CompletableFuture<Void>> futs = new ArrayList<>();
 
-        if (log.isInfoEnabled())
-            log.info("Submit partition processing tasks with partition allocated lengths: " + partFileLengths);
+        if (log.isInfoEnabled()) {
+            log.info("Submit partition processing tasks to the snapshot execution pool " +
+                "[map=" + compactGroupPartitions(partFileLengths.keySet()) +
+                ", totalSize=" + U.humanReadableByteCount(partFileLengths.values().stream().mapToLong(v -> v).sum()) + ']');
+        }
 
         Collection<BinaryType> binTypesCopy = cctx.kernalContext()
             .cacheObjects()
@@ -683,6 +693,20 @@
         return true;
     }
 
+    /**
+     * @param grps List of processing pairs.
+     * @return Map of cache group id their partitions compacted by {@link S#compact(Collection)}.
+     */
+    private static Map<Integer, String> compactGroupPartitions(Collection<GroupPartitionId> grps) {
+        return grps.stream()
+            .collect(Collectors.groupingBy(GroupPartitionId::getGroupId,
+                Collectors.mapping(GroupPartitionId::getPartitionId,
+                    Collectors.toSet())))
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> S.compact(e.getValue())));
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)