Fix create segment phase of batch ingestion to take segment identifiers that have a non UTC interval… (#11635)

* Fix create segment phase of batch ingestion to take segment identifiers with non UTC time zones

* Fix  comment and LGTM forbidden error
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index be5cadd..5c4bd2b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -84,7 +84,7 @@
  * reasons, the code for creating segments was all handled by the same code path in that class. The code
  * was correct but inefficient for batch ingestion from a memory perspective. If the input file being processed
  * by batch ingestion had enough sinks & hydrants produced then it may run out of memory either in the
- * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore a new class,
+ * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore, a new class,
  * {@code BatchAppenderator}, this class, was created to specialize in batch ingestion and the old class
  * for stream ingestion was renamed to {@link StreamAppenderator}.
  * <p>
@@ -321,10 +321,10 @@
     return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);
   }
 
-  @Override
   /**
    * Returns all active segments regardless whether they are in memory or persisted
    */
+  @Override
   public List<SegmentIdWithShardSpec> getSegments()
   {
     return ImmutableList.copyOf(sinksMetadata.keySet());
@@ -568,9 +568,9 @@
 
     log.info("Preparing to push...");
 
-    // get the dirs for the identfiers:
-    List<File> identifiersDirs = new ArrayList<>();
+    // Traverse identifiers, load their sink, and push it:
     int totalHydrantsMerged = 0;
+    final List<DataSegment> dataSegments = new ArrayList<>();
     for (SegmentIdWithShardSpec identifier : identifiers) {
       SinkMetadata sm = sinksMetadata.get(identifier);
       if (sm == null) {
@@ -580,34 +580,28 @@
       if (persistedDir == null) {
         throw new ISE("Sink for identifier[%s] not found in local file system", identifier);
       }
-      identifiersDirs.add(persistedDir);
       totalHydrantsMerged += sm.getNumHydrants();
-    }
-
-    // push all sinks for identifiers:
-    final List<DataSegment> dataSegments = new ArrayList<>();
-    for (File identifier : identifiersDirs) {
 
       // retrieve sink from disk:
-      Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
+      Sink sinkForIdentifier;
       try {
-        identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
+        sinkForIdentifier = getSinkForIdentifierPath(identifier, persistedDir);
       }
       catch (IOException e) {
-        throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", identifier);
+        throw new ISE(e, "Failed to retrieve sinks for identifier[%s] in path[%s]", identifier, persistedDir);
       }
 
-      // push it:
+      // push sink:
       final DataSegment dataSegment = mergeAndPush(
-          identifiersAndSinks.lhs,
-          identifiersAndSinks.rhs
+          identifier,
+          sinkForIdentifier
       );
 
       // record it:
       if (dataSegment != null) {
         dataSegments.add(dataSegment);
       } else {
-        log.warn("mergeAndPush[%s] returned null, skipping.", identifiersAndSinks.lhs);
+        log.warn("mergeAndPush[%s] returned null, skipping.", identifier);
       }
 
     }
@@ -862,15 +856,9 @@
     return retVal;
   }
 
-  private Pair<SegmentIdWithShardSpec, Sink> getIdentifierAndSinkForPersistedFile(File identifierPath)
+  private Sink getSinkForIdentifierPath(SegmentIdWithShardSpec identifier, File identifierPath)
       throws IOException
   {
-
-    final SegmentIdWithShardSpec identifier = objectMapper.readValue(
-        new File(identifierPath, IDENTIFIER_FILE_NAME),
-        SegmentIdWithShardSpec.class
-    );
-
     // To avoid reading and listing of "merged" dir and other special files
     final File[] sinkFiles = identifierPath.listFiles(
         (dir, fileName) -> !(Ints.tryParse(fileName) == null)
@@ -901,7 +889,7 @@
       );
     }
 
-    Sink currSink = new Sink(
+    Sink retVal = new Sink(
         identifier.getInterval(),
         schema,
         identifier.getShardSpec(),
@@ -912,8 +900,8 @@
         null,
         hydrants
     );
-    currSink.finishWriting(); // this sink is not writable
-    return new Pair<>(identifier, currSink);
+    retVal.finishWriting(); // this sink is not writable
+    return retVal;
   }
 
   // This function does not remove the sink from its tracking Map (sinks), the caller is responsible for that
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
index 05b26b4..912e863 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -31,6 +31,9 @@
 import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -125,6 +128,72 @@
     }
   }
 
+  /**
+   * Test the case when a segment identifier contains non UTC timestamps in its interval. This can happen
+   * when a custom segment granularity for an interval with a non UTC Chronlogy is created by
+   * {@link org.apache.druid.java.util.common.granularity.PeriodGranularity#bucketStart(DateTime)}
+   */
+  @Test
+  public void testPeriodGranularityNonUTCIngestion() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(1, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      // startJob
+      Assert.assertNull(appenderator.startJob());
+
+      // getDataSource
+      Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+      // Create a segment identifier with a non-utc interval
+      SegmentIdWithShardSpec segmentIdWithNonUTCTime =
+          createNonUTCSegmentId("2021-06-27T00:00:00.000+09:00/2021-06-28T00:00:00.000+09:00",
+                          "A", 0); // should be in seg_0
+
+      Assert.assertEquals(
+          1,
+          appenderator.add(segmentIdWithNonUTCTime, createInputRow("2021-06-27T00:01:11.080Z", "foo", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      // getSegments
+      Assert.assertEquals(
+          Collections.singletonList(segmentIdWithNonUTCTime),
+          appenderator.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+
+
+      // since we just added one row and the max rows in memory is one, all the segments (sinks etc)
+      // above should be cleared now
+      Assert.assertEquals(
+          Collections.emptyList(),
+          ((BatchAppenderator) appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList())
+      );
+
+
+      // push all
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          appenderator.getSegments(),
+          null,
+          false
+      ).get();
+      Assert.assertEquals(
+          Collections.singletonList(segmentIdWithNonUTCTime),
+          Lists.transform(
+              segmentsAndCommitMetadata.getSegments(),
+              SegmentIdWithShardSpec::fromDataSegment
+          ).stream().sorted().collect(Collectors.toList())
+      );
+      Assert.assertEquals(
+          tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+
+      appenderator.close();
+      Assert.assertTrue(appenderator.getSegments().isEmpty());
+    }
+  }
+
   @Test
   public void testSimpleIngestionWithFallbackCodePath() throws Exception
   {
@@ -887,7 +956,19 @@
 
     }
   }
-  
+
+
+  private static SegmentIdWithShardSpec createNonUTCSegmentId(String interval, String version, int partitionNum)
+  {
+    return new SegmentIdWithShardSpec(
+        BatchAppenderatorTester.DATASOURCE,
+        new Interval(interval, ISOChronology.getInstance(DateTimes.inferTzFromString("Asia/Seoul"))),
+        version,
+        new LinearShardSpec(partitionNum)
+
+    );
+  }
+
   private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
   {
     return new SegmentIdWithShardSpec(