Fix create segment phase of batch ingestion to take segment identifiers that have a non UTC interval… (#11635)
* Fix create segment phase of batch ingestion to take segment identifiers with non UTC time zones
* Fix comment and LGTM forbidden error
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index be5cadd..5c4bd2b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -84,7 +84,7 @@
* reasons, the code for creating segments was all handled by the same code path in that class. The code
* was correct but inefficient for batch ingestion from a memory perspective. If the input file being processed
* by batch ingestion had enough sinks & hydrants produced then it may run out of memory either in the
- * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore a new class,
+ * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore, a new class,
* {@code BatchAppenderator}, this class, was created to specialize in batch ingestion and the old class
* for stream ingestion was renamed to {@link StreamAppenderator}.
* <p>
@@ -321,10 +321,10 @@
return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);
}
- @Override
/**
* Returns all active segments regardless whether they are in memory or persisted
*/
+ @Override
public List<SegmentIdWithShardSpec> getSegments()
{
return ImmutableList.copyOf(sinksMetadata.keySet());
@@ -568,9 +568,9 @@
log.info("Preparing to push...");
- // get the dirs for the identfiers:
- List<File> identifiersDirs = new ArrayList<>();
+ // Traverse identifiers, load their sink, and push it:
int totalHydrantsMerged = 0;
+ final List<DataSegment> dataSegments = new ArrayList<>();
for (SegmentIdWithShardSpec identifier : identifiers) {
SinkMetadata sm = sinksMetadata.get(identifier);
if (sm == null) {
@@ -580,34 +580,28 @@
if (persistedDir == null) {
throw new ISE("Sink for identifier[%s] not found in local file system", identifier);
}
- identifiersDirs.add(persistedDir);
totalHydrantsMerged += sm.getNumHydrants();
- }
-
- // push all sinks for identifiers:
- final List<DataSegment> dataSegments = new ArrayList<>();
- for (File identifier : identifiersDirs) {
// retrieve sink from disk:
- Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
+ Sink sinkForIdentifier;
try {
- identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
+ sinkForIdentifier = getSinkForIdentifierPath(identifier, persistedDir);
}
catch (IOException e) {
- throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", identifier);
+ throw new ISE(e, "Failed to retrieve sinks for identifier[%s] in path[%s]", identifier, persistedDir);
}
- // push it:
+ // push sink:
final DataSegment dataSegment = mergeAndPush(
- identifiersAndSinks.lhs,
- identifiersAndSinks.rhs
+ identifier,
+ sinkForIdentifier
);
// record it:
if (dataSegment != null) {
dataSegments.add(dataSegment);
} else {
- log.warn("mergeAndPush[%s] returned null, skipping.", identifiersAndSinks.lhs);
+ log.warn("mergeAndPush[%s] returned null, skipping.", identifier);
}
}
@@ -862,15 +856,9 @@
return retVal;
}
- private Pair<SegmentIdWithShardSpec, Sink> getIdentifierAndSinkForPersistedFile(File identifierPath)
+ private Sink getSinkForIdentifierPath(SegmentIdWithShardSpec identifier, File identifierPath)
throws IOException
{
-
- final SegmentIdWithShardSpec identifier = objectMapper.readValue(
- new File(identifierPath, IDENTIFIER_FILE_NAME),
- SegmentIdWithShardSpec.class
- );
-
// To avoid reading and listing of "merged" dir and other special files
final File[] sinkFiles = identifierPath.listFiles(
(dir, fileName) -> !(Ints.tryParse(fileName) == null)
@@ -901,7 +889,7 @@
);
}
- Sink currSink = new Sink(
+ Sink retVal = new Sink(
identifier.getInterval(),
schema,
identifier.getShardSpec(),
@@ -912,8 +900,8 @@
null,
hydrants
);
- currSink.finishWriting(); // this sink is not writable
- return new Pair<>(identifier, currSink);
+ retVal.finishWriting(); // this sink is not writable
+ return retVal;
}
// This function does not remove the sink from its tracking Map (sinks), the caller is responsible for that
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
index 05b26b4..912e863 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -31,6 +31,9 @@
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;
@@ -125,6 +128,72 @@
}
}
+ /**
+ * Test the case when a segment identifier contains non UTC timestamps in its interval. This can happen
+ * when a custom segment granularity for an interval with a non UTC Chronlogy is created by
+ * {@link org.apache.druid.java.util.common.granularity.PeriodGranularity#bucketStart(DateTime)}
+ */
+ @Test
+ public void testPeriodGranularityNonUTCIngestion() throws Exception
+ {
+ try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(1, true)) {
+ final Appenderator appenderator = tester.getAppenderator();
+
+ // startJob
+ Assert.assertNull(appenderator.startJob());
+
+ // getDataSource
+ Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+ // Create a segment identifier with a non-utc interval
+ SegmentIdWithShardSpec segmentIdWithNonUTCTime =
+ createNonUTCSegmentId("2021-06-27T00:00:00.000+09:00/2021-06-28T00:00:00.000+09:00",
+ "A", 0); // should be in seg_0
+
+ Assert.assertEquals(
+ 1,
+ appenderator.add(segmentIdWithNonUTCTime, createInputRow("2021-06-27T00:01:11.080Z", "foo", 1), null)
+ .getNumRowsInSegment()
+ );
+
+ // getSegments
+ Assert.assertEquals(
+ Collections.singletonList(segmentIdWithNonUTCTime),
+ appenderator.getSegments().stream().sorted().collect(Collectors.toList())
+ );
+
+
+ // since we just added one row and the max rows in memory is one, all the segments (sinks etc)
+ // above should be cleared now
+ Assert.assertEquals(
+ Collections.emptyList(),
+ ((BatchAppenderator) appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList())
+ );
+
+
+ // push all
+ final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+ appenderator.getSegments(),
+ null,
+ false
+ ).get();
+ Assert.assertEquals(
+ Collections.singletonList(segmentIdWithNonUTCTime),
+ Lists.transform(
+ segmentsAndCommitMetadata.getSegments(),
+ SegmentIdWithShardSpec::fromDataSegment
+ ).stream().sorted().collect(Collectors.toList())
+ );
+ Assert.assertEquals(
+ tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+ segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
+ );
+
+ appenderator.close();
+ Assert.assertTrue(appenderator.getSegments().isEmpty());
+ }
+ }
+
@Test
public void testSimpleIngestionWithFallbackCodePath() throws Exception
{
@@ -887,7 +956,19 @@
}
}
-
+
+
+ private static SegmentIdWithShardSpec createNonUTCSegmentId(String interval, String version, int partitionNum)
+ {
+ return new SegmentIdWithShardSpec(
+ BatchAppenderatorTester.DATASOURCE,
+ new Interval(interval, ISOChronology.getInstance(DateTimes.inferTzFromString("Asia/Seoul"))),
+ version,
+ new LinearShardSpec(partitionNum)
+
+ );
+ }
+
private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(