blob: d71964eed6681609db2e681051da3791bc200475 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class TaskReportSerdeTest
{
private final ObjectMapper jsonMapper;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
public TaskReportSerdeTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(ExceptionalTaskReport.class);
}
@Test
public void testSerdeOfIngestionReport() throws Exception
{
IngestionStatsAndErrorsTaskReport originalReport = buildTestIngestionReport();
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof IngestionStatsAndErrorsTaskReport);
IngestionStatsAndErrorsTaskReport deserializedReport = (IngestionStatsAndErrorsTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfKillTaskReport() throws Exception
{
KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3));
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof KillTaskReport);
KillTaskReport deserializedReport = (KillTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testSerdeOfTaskContextReport() throws Exception
{
TaskContextReport originalReport = new TaskContextReport(
"taskId",
ImmutableMap.of("key1", "value1", "key2", "value2")
);
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
Assert.assertTrue(deserialized instanceof TaskContextReport);
TaskContextReport deserializedReport = (TaskContextReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
}
@Test
public void testWriteReportMapToFileAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport();
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1);
writer.write("testID", reportMap1);
TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap1, reportMap2);
}
@Test
public void testWriteReportMapToStringAndRead() throws Exception
{
IngestionStatsAndErrorsTaskReport ingestionReport = buildTestIngestionReport();
TaskReport.ReportMap reportMap = TaskReport.buildTaskReports(ingestionReport);
String json = jsonMapper.writeValueAsString(reportMap);
TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, TaskReport.ReportMap.class);
Assert.assertEquals(reportMap, deserializedReportMap);
}
@Test
@SuppressWarnings("unchecked")
public void testWritePlainMapAndReadAsReportMap() throws Exception
{
final long now = System.currentTimeMillis();
final List<ParseExceptionReport> buildUnparseableEvents = Arrays.asList(
new ParseExceptionReport("abc,def", "invalid timestamp", Arrays.asList("row: 1", "col: 1"), now),
new ParseExceptionReport("xyz,pqr", "invalid timestamp", Arrays.asList("row: 1", "col: 1"), now)
);
final Map<String, Object> unparseableEvents
= ImmutableMap.of("determinePartitions", Collections.emptyList(), "buildSegments", buildUnparseableEvents);
final Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0,
"processedBytes", 0,
"unparseable", 0,
"thrownAway", 0,
"processedWithError", 0
);
final Map<String, Object> emptyAverages = ImmutableMap.of(
"1m", emptyAverageMinuteMap,
"5m", emptyAverageMinuteMap,
"15m", emptyAverageMinuteMap
);
final Map<String, Object> expectedAverages
= ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages);
final RowIngestionMetersTotals determinePartitionTotalStats
= RowMeters.with().errors(10).unparseable(1).thrownAway(1).bytes(2000).totalProcessed(100);
final RowIngestionMetersTotals buildSegmentTotalStats
= RowMeters.with().errors(5).unparseable(2).thrownAway(1).bytes(2500).totalProcessed(150);
final Map<String, Object> expectedTotals
= ImmutableMap.of("determinePartitions", determinePartitionTotalStats, "buildSegments", buildSegmentTotalStats);
final Map<String, Object> expectedRowStats = ImmutableMap.of(
"movingAverages", expectedAverages,
"totals", expectedTotals
);
final Map<String, Object> expectedPayload = new HashMap<>();
expectedPayload.put("ingestionState", IngestionState.COMPLETED);
expectedPayload.put("unparseableEvents", unparseableEvents);
expectedPayload.put("rowStats", expectedRowStats);
final Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
ingestionStatsAndErrors.put("taskId", "abc");
ingestionStatsAndErrors.put("payload", expectedPayload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
final Map<String, Object> expectedReportMap = new HashMap<>();
expectedReportMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
final String plainMapJson = jsonMapper.writeValueAsString(expectedReportMap);
// Verify the top-level structure of the report
final TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(plainMapJson, TaskReport.ReportMap.class);
Optional<IngestionStatsAndErrorsTaskReport> ingestStatsReport = deserializedReportMap.findReport(
"ingestionStatsAndErrors");
Assert.assertTrue(ingestStatsReport.isPresent());
Assert.assertEquals("ingestionStatsAndErrors", ingestStatsReport.get().getReportKey());
Assert.assertEquals("abc", ingestStatsReport.get().getTaskId());
// Verify basic fields in the payload
final IngestionStatsAndErrors observedPayload = ingestStatsReport.get().getPayload();
Assert.assertEquals(expectedPayload.get("ingestionState"), observedPayload.getIngestionState());
Assert.assertNull(observedPayload.getSegmentsRead());
Assert.assertNull(observedPayload.getSegmentsPublished());
Assert.assertNull(observedPayload.getErrorMsg());
Assert.assertNull(observedPayload.getRecordsProcessed());
// Verify stats and unparseable events
final Map<String, Object> observedRowStats = observedPayload.getRowStats();
Assert.assertEquals(expectedAverages, observedRowStats.get("movingAverages"));
final Map<String, Object> observedTotals = (Map<String, Object>) observedRowStats.get("totals");
verifyTotalRowStats(observedTotals, determinePartitionTotalStats, buildSegmentTotalStats);
verifyUnparseableEvents(observedPayload.getUnparseableEvents(), buildUnparseableEvents);
// Re-serialize report map and deserialize as plain map
final String reportMapJson = jsonMapper.writeValueAsString(deserializedReportMap);
final Map<String, Object> deserializedPlainMap = jsonMapper.readValue(
reportMapJson,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final Map<String, Object> ingestStatsReport2
= (Map<String, Object>) deserializedPlainMap.get("ingestionStatsAndErrors");
// Verify basic fields in the payload
final Map<String, Object> observedPayload2 = (Map<String, Object>) ingestStatsReport2.get("payload");
Assert.assertEquals(expectedPayload.get("ingestionState").toString(), observedPayload2.get("ingestionState"));
Assert.assertNull(observedPayload2.get("segmentsRead"));
Assert.assertNull(observedPayload2.get("segmentsPublished"));
Assert.assertNull(observedPayload2.get("errorMsg"));
Assert.assertNull(observedPayload2.get("recordsProcessed"));
// Verify stats and unparseable events
final Map<String, Object> observedRowStats2 = (Map<String, Object>) observedPayload2.get("rowStats");
Assert.assertEquals(expectedAverages, observedRowStats2.get("movingAverages"));
final Map<String, Object> observedTotals2 = (Map<String, Object>) observedRowStats2.get("totals");
verifyTotalRowStats(observedTotals2, determinePartitionTotalStats, buildSegmentTotalStats);
verifyUnparseableEvents(
(Map<String, Object>) observedPayload2.get("unparseableEvents"),
buildUnparseableEvents
);
}
@Test
public void testSerializationOnMissingPartitionStats() throws Exception
{
String json = "{\n"
+ " \"type\": \"ingestionStatsAndErrors\",\n"
+ " \"taskId\": \"ingestionStatsAndErrors\",\n"
+ " \"payload\": {\n"
+ " \"ingestionState\": \"COMPLETED\",\n"
+ " \"unparseableEvents\": {\n"
+ " \"hello\": \"world\"\n"
+ " },\n"
+ " \"rowStats\": {\n"
+ " \"number\": 1234\n"
+ " },\n"
+ " \"errorMsg\": \"an error message\",\n"
+ " \"segmentAvailabilityConfirmed\": true,\n"
+ " \"segmentAvailabilityWaitTimeMs\": 1000\n"
+ " }\n"
+ "}";
IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport(
IngestionStatsAndErrorsTaskReport.REPORT_KEY,
new IngestionStatsAndErrors(
IngestionState.COMPLETED,
ImmutableMap.of(
"hello", "world"
),
ImmutableMap.of(
"number", 1234
),
"an error message",
true,
1000L,
null,
null,
null
)
);
TaskReport deserialized = jsonMapper.readValue(json, TaskReport.class);
Assert.assertEquals(expected.getTaskId(), deserialized.getTaskId());
Assert.assertEquals(expected, deserialized);
}
@Test
public void testExceptionWhileWritingReport() throws Exception
{
final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
writer.write("theTask", TaskReport.buildTaskReports(new ExceptionalTaskReport()));
// Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was
// not complete when written.
Assert.assertEquals(
"{\"report\":{\"type\":\"exceptional\"",
Files.asCharSource(reportFile, StandardCharsets.UTF_8).read()
);
}
private IngestionStatsAndErrorsTaskReport buildTestIngestionReport()
{
return new IngestionStatsAndErrorsTaskReport(
"testID",
new IngestionStatsAndErrors(
IngestionState.BUILD_SEGMENTS,
Collections.singletonMap("hello", "world"),
Collections.singletonMap("number", 1234),
"an error message",
true,
1000L,
Collections.singletonMap("PartitionA", 5000L),
5L,
10L
)
);
}
private void verifyUnparseableEvents(
Map<String, Object> observed,
List<ParseExceptionReport> buildSegmentUnparseableEvents
)
{
Assert.assertEquals(Collections.emptyList(), observed.get("determinePartitions"));
final List<Object> observedBuildSegmentUnparseableEvents
= (List<Object>) observed.get("buildSegments");
Assert.assertEquals(2, observedBuildSegmentUnparseableEvents.size());
for (int i = 0; i < buildSegmentUnparseableEvents.size(); ++i) {
final ParseExceptionReport expectedEvent = buildSegmentUnparseableEvents.get(i);
final Object observedEvent = observedBuildSegmentUnparseableEvents.get(i);
Assert.assertEquals(
ImmutableMap.of(
"input", expectedEvent.getInput(),
"errorType", expectedEvent.getErrorType(),
"details", expectedEvent.getDetails(),
"timeOfExceptionMillis", expectedEvent.getTimeOfExceptionMillis()
),
observedEvent
);
}
}
private void verifyTotalRowStats(
Map<String, Object> observedTotals,
RowIngestionMetersTotals determinePartitionTotalStats,
RowIngestionMetersTotals buildSegmentTotalStats
)
{
Assert.assertEquals(
ImmutableMap.of(
"processed", (int) determinePartitionTotalStats.getProcessed(),
"processedBytes", (int) determinePartitionTotalStats.getProcessedBytes(),
"processedWithError", (int) determinePartitionTotalStats.getProcessedWithError(),
"thrownAway", (int) determinePartitionTotalStats.getThrownAway(),
"unparseable", (int) determinePartitionTotalStats.getUnparseable()
),
observedTotals.get("determinePartitions")
);
Assert.assertEquals(
ImmutableMap.of(
"processed", (int) buildSegmentTotalStats.getProcessed(),
"processedBytes", (int) buildSegmentTotalStats.getProcessedBytes(),
"processedWithError", (int) buildSegmentTotalStats.getProcessedWithError(),
"thrownAway", (int) buildSegmentTotalStats.getThrownAway(),
"unparseable", (int) buildSegmentTotalStats.getUnparseable()
),
observedTotals.get("buildSegments")
);
}
/**
* Task report that throws an exception while being serialized.
*/
@JsonTypeName("exceptional")
private static class ExceptionalTaskReport implements TaskReport
{
@Override
@JsonProperty
public String getTaskId()
{
throw new UnsupportedOperationException("cannot serialize task ID");
}
@Override
public String getReportKey()
{
return "report";
}
@Override
@JsonProperty
public Object getPayload()
{
throw new UnsupportedOperationException("cannot serialize payload");
}
}
}