SAMZA-2517 : Adding handling and relevant exception message for null in oldest offset from system-admin (#1353)
* Adding handling and relevant exception message for null in oldest offset from system-admin
Co-authored-by: Ray Manpreet Singh Matharu <rmatharu@rmatharu-mn1.linkedin.biz>
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 50d7950..c2ebe44 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -94,7 +94,8 @@
+ " The values between these offsets cannot be restored.", resumeOffset, oldestOffset);
}
}
-
+ LOG.info("Starting offset for SystemStreamPartition {} is {}, fileOffset: {}, oldestOffset from source: {}", ssp,
+ startingOffset, fileOffset, oldestOffset);
return startingOffset;
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index e2cfe1d..9cd888a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -118,10 +118,10 @@
LOG.info("Initializing side input stores.");
Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
- LOG.info("File offsets for the task {}: ", taskName, fileOffsets);
+ LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
- LOG.info("Oldest offsets for the task {}: ", taskName, fileOffsets);
+ LOG.info("Oldest offsets for the task {}: {}", taskName, oldestOffsets);
startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
@@ -346,7 +346,8 @@
* 3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
* and populates the oldest offset for SSPs belonging to the system stream.
*
- * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset.
+ * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. If partitionMetadata could not be
+ * obtained for any {@link SystemStreamPartition} the offset for it is populated as null.
*/
@VisibleForTesting
Map<SystemStreamPartition, String> getOldestOffsets() {
@@ -363,16 +364,17 @@
// Step 3
metadata.forEach((systemStream, systemStreamMetadata) -> {
+
// get the partition metadata for each system stream
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
systemStreamMetadata.getSystemStreamPartitionMetadata();
// For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
- Map<SystemStreamPartition, String> offsets = systemStreamToSsp.get(systemStream).stream()
- .collect(
- Collectors.toMap(Function.identity(), ssp -> partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
-
- oldestOffsets.putAll(offsets);
+ // if partitionMetadata was not obtained for any SSP, populate oldest-offset as null
+ // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using lambda will NPE when getOldestOffset() is null
+ for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+ oldestOffsets.put(ssp, partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+ }
});
return oldestOffsets;
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 8623e5d..6e59e55 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -738,7 +738,8 @@
String startingOffset = sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
if (startingOffset == null) {
- throw new SamzaException("No offset defined for SideInput SystemStreamPartition : " + ssp);
+ throw new SamzaException(
+ "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
}
// register startingOffset with the sysConsumer and register a metric for it
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index a7cefa0..6761702 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.File;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -34,9 +35,12 @@
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -182,6 +186,36 @@
});
}
+ /**
+ * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+ */
+ @Test
+ public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+ final String storeName = "test-get-starting-offset-store";
+ final String taskName = "test-get-starting-offset-task";
+
+ Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+ .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
+ .collect(Collectors.toSet());
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+ .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+ x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+ TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+ .addLoggedStore(storeName, ssps)
+ .addStreamMetadata(Collections.singletonMap(new SystemStream("test-system", "test-stream"),
+ new SystemStreamMetadata("test-stream", partitionMetadata)))
+ .build();
+
+ initializeSideInputStorageManager(testSideInputStorageManager);
+ ssps.forEach(ssp -> {
+ String startingOffset = testSideInputStorageManager.getStartingOffset(
+ new SystemStreamPartition("test-system", "test-stream", ssp.getPartition()));
+ Assert.assertNull("Starting offset should be null", startingOffset);
+ });
+ }
+
@Test
public void testGetStartingOffsets() {
final String storeName = "test-get-starting-offset-store";
@@ -276,6 +310,11 @@
return this;
}
+ MockTaskSideInputStorageManagerBuilder addStreamMetadata(Map<SystemStream, SystemStreamMetadata> streamMetadata) {
+ doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
+ return this;
+ }
+
MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) {
StorageEngine storageEngine = mock(StorageEngine.class);
when(storageEngine.getStoreProperties()).thenReturn(