SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 2: includes all modules with checkstyle currently enabled, excluding samza-core) (#1390)
API/Upgrade/Usage changes: N/A
diff --git a/build.gradle b/build.gradle
index 05f4066..dfbe2e6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -209,8 +209,7 @@
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
- // temporarily hardcode 6.11.2 until all other modules are upgraded
- toolVersion = "6.11.2"
+ toolVersion = "$checkstyleVersion"
}
test {
diff --git a/gradle.properties b/gradle.properties
index dda382c..cb5da70 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -24,4 +24,4 @@
org.gradle.jvmargs="-XX:MaxPermSize=512m"
systemProp.file.encoding=utf-8
-checkstyleVersion=6.11
+checkstyleVersion=6.11.2
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
index 85835f9..6ac7615 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
@@ -48,7 +48,7 @@
this.gauges = this.percentiles.stream()
.filter(x -> x > 0 && x <= 100)
.collect(Collectors.toMap(Function.identity(),
- x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
+ x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
}
public void update(long value) {
diff --git a/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java b/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java
index 2c087d2..f6ec8aa 100644
--- a/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java
+++ b/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java
@@ -37,11 +37,11 @@
assertEquals(obj, serde.fromBytes(bytes));
JsonSerdeV2<Map.Entry<String, Object>> serdeHashMapEntry = new JsonSerdeV2<>();
obj.entrySet().forEach(entry -> {
- try {
- serdeHashMapEntry.toBytes(entry);
- } catch (Exception e) {
- fail("HashMap Entry serialization failed!");
- }
- });
+ try {
+ serdeHashMapEntry.toBytes(entry);
+ } catch (Exception e) {
+ fail("HashMap Entry serialization failed!");
+ }
+ });
}
}
diff --git a/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java b/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java
index 6ab1c8b..3f13e44 100644
--- a/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java
+++ b/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java
@@ -31,10 +31,8 @@
private static final String FACTORY_CLASS_NAME = "org.apache.samza.GraphExpandingSystemFactory";
public ExampleExpandingSystemDescriptor(String systemName) {
- super(systemName, FACTORY_CLASS_NAME,
- (InputTransformer<String>) IncomingMessageEnvelope::toString,
- (streamGraph, inputDescriptor) -> (MessageStream<Long>) streamGraph.getInputStream(inputDescriptor)
- );
+ super(systemName, FACTORY_CLASS_NAME, (InputTransformer<String>) IncomingMessageEnvelope::toString,
+ (streamGraph, inputDescriptor) -> (MessageStream<Long>) streamGraph.getInputStream(inputDescriptor));
}
@Override
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
index f6f11ff..7de447c 100644
--- a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -45,10 +45,10 @@
public TableRateLimiter<String, String> getThrottler(String tag) {
TableRateLimiter.CreditFunction<String, String> credFn =
(TableRateLimiter.CreditFunction<String, String>) (key, value, args) -> {
- int credits = key == null ? 0 : 3;
- credits += value == null ? 0 : 3;
- return credits;
- };
+ int credits = key == null ? 0 : 3;
+ credits += value == null ? 0 : 3;
+ return credits;
+ };
RateLimiter rateLimiter = mock(RateLimiter.class);
doReturn(Collections.singleton(tag)).when(rateLimiter).getSupportedTags();
TableRateLimiter<String, String> rateLimitHelper = new TableRateLimiter<>("foo", rateLimiter, credFn, tag);
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
index e0c9099..8e3fc7b 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -84,12 +84,12 @@
// all properties should now start with stream name
Set<String> streams = new HashSet<>();
streamsConfig.keySet().forEach(key -> {
- String[] parts = key.split("\\.", 2);
- if (parts.length != 2) {
- throw new IllegalArgumentException("Ill-formatted stream config: " + key);
- }
- streams.add(parts[0]);
- });
+ String[] parts = key.split("\\.", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Ill-formatted stream config: " + key);
+ }
+ streams.add(parts[0]);
+ });
return streams;
}
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
index 2758022..60d3b37 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -76,11 +76,11 @@
// Kinesis streams cannot be configured as bootstrap streams
KinesisConfig kConfig = new KinesisConfig(config);
kConfig.getKinesisStreams(system).forEach(stream -> {
- StreamConfig streamConfig = new StreamConfig(kConfig);
- SystemStream ss = new SystemStream(system, stream);
- if (streamConfig.getBootstrapEnabled(ss)) {
- throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
- }
- });
+ StreamConfig streamConfig = new StreamConfig(kConfig);
+ SystemStream ss = new SystemStream(system, stream);
+ if (streamConfig.getBootstrapEnabled(ss)) {
+ throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
+ }
+ });
}
}
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
index 80d43de..16cd4d5 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -215,19 +215,19 @@
public void afterCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
LOG.info("afterCheckpoint called with sspOffsets {}", sspOffsets);
sspOffsets.forEach((ssp, offset) -> {
- KinesisRecordProcessor processor = processors.get(ssp);
- KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
- if (processor == null) {
- LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
- + " checkpoint {}.", ssp, offset);
- } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
- LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
- + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
- kinesisOffset.getShardId(), offset);
- } else {
- processor.checkpoint(kinesisOffset.getSeqNumber());
- }
- });
+ KinesisRecordProcessor processor = processors.get(ssp);
+ KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
+ if (processor == null) {
+ LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
+ + " checkpoint {}.", ssp, offset);
+ } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
+ LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
+ + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
+ kinesisOffset.getShardId(), offset);
+ } else {
+ processor.checkpoint(kinesisOffset.getSeqNumber());
+ }
+ });
}
@Override
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
index fcce49e..130a189 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
@@ -110,12 +110,11 @@
String clientConfigPrefix =
String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
- region.ifPresent(
- val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
+ region.ifPresent(val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
accessKey.ifPresent(
- val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
+ val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
secretKey.ifPresent(
- val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
+ val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
return config;
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
index 678dfe6..f5f27c9 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
@@ -121,12 +121,10 @@
Map<String, String> config = new HashMap<>(super.toConfig());
String systemName = getSystemName();
- region.ifPresent(
- val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
- proxyHost.ifPresent(
- val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
+ region.ifPresent(val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
+ proxyHost.ifPresent(val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
proxyPort.ifPresent(
- val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
+ val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
index 2f42981..e3a9c55 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
@@ -73,7 +73,7 @@
.collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
millisBehindLatest = streamNames.stream()
.collect(Collectors.toConcurrentMap(Function.identity(),
- x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
+ x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
// Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
synchronized (LOCK) {
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
index 6f1f052..2551d07 100644
--- a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -233,22 +233,22 @@
List<KinesisRecordProcessor> processors) {
Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
processors.forEach(processor -> {
- try {
- // Create records and call process records
- IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
- doNothing().when(checkpointer).checkpoint(anyString());
- doNothing().when(checkpointer).checkpoint();
- ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
- when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
- when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
- List<Record> inputRecords = createRecords(numRecordsPerShard);
- processorRecordMap.put(processor, inputRecords);
- when(processRecordsInput.getRecords()).thenReturn(inputRecords);
- processor.processRecords(processRecordsInput);
- } catch (ShutdownException | InvalidStateException ex) {
- throw new RuntimeException(ex);
- }
- });
+ try {
+ // Create records and call process records
+ IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
+ doNothing().when(checkpointer).checkpoint(anyString());
+ doNothing().when(checkpointer).checkpoint();
+ ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
+ when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
+ when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
+ List<Record> inputRecords = createRecords(numRecordsPerShard);
+ processorRecordMap.put(processor, inputRecords);
+ when(processRecordsInput.getRecords()).thenReturn(inputRecords);
+ processor.processRecords(processRecordsInput);
+ } catch (ShutdownException | InvalidStateException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
return processorRecordMap;
}
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
index fe7fa96..235c829 100644
--- a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -103,9 +103,9 @@
List<SystemStreamPartition> ssps = new LinkedList<>();
IntStream.range(0, numShards)
.forEach(p -> {
- SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
- ssps.add(ssp);
- });
+ SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
+ ssps.add(ssp);
+ });
ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
// Create Kinesis record processor factory
@@ -133,47 +133,47 @@
Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
ssps.forEach(ssp -> {
- try {
- KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
+ try {
+ KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
- // Verify that the read messages are received in order and are the same as input records
- Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
- List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
- List<Record> inputRecords = inputRecordMap.get(processor);
- verifyRecords(envelopes, inputRecords, processor.getShardId());
+ // Verify that the read messages are received in order and are the same as input records
+ Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+ List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+ List<Record> inputRecords = inputRecordMap.get(processor);
+ verifyRecords(envelopes, inputRecords, processor.getShardId());
- // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
- IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
- consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
- ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
- verify(getCheckpointer(processor)).checkpoint(argument.capture());
- Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
+ // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+ IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+ consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(getCheckpointer(processor)).checkpoint(argument.capture());
+ Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
- // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
- shutDownProcessor(processor, ShutdownReason.ZOMBIE);
- Assert.assertFalse(sspToProcessorMap.containsValue(processor));
- Assert.assertTrue(isSspAvailable(consumer, ssp));
- } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
- throw new RuntimeException(ex);
- }
- });
+ // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
+ shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+ Assert.assertFalse(sspToProcessorMap.containsValue(processor));
+ Assert.assertTrue(isSspAvailable(consumer, ssp));
+ } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
}
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
IntStream.range(0, numShards)
.forEach(p -> {
- String shardId = String.format("shard-%05d", p);
- // Create Kinesis processor
- KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
+ String shardId = String.format("shard-%05d", p);
+ // Create Kinesis processor
+ KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
- // Initialize the shard
- ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
- InitializationInput initializationInput =
- new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
- processor.initialize(initializationInput);
- processorMap.put(shardId, processor);
- });
+ // Initialize the shard
+ ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+ InitializationInput initializationInput =
+ new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
+ processor.initialize(initializationInput);
+ processorMap.put(shardId, processor);
+ });
return processorMap;
}
@@ -186,12 +186,12 @@
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
receivedMessages.forEach((key, value) -> {
- if (messages.containsKey(key)) {
- messages.get(key).addAll(value);
- } else {
- messages.put(key, new ArrayList<>(value));
- }
- });
+ if (messages.containsKey(key)) {
+ messages.get(key).addAll(value);
+ } else {
+ messages.put(key, new ArrayList<>(value));
+ }
+ });
totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
}
@@ -205,19 +205,19 @@
private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
Iterator outputRecordsIter = outputRecords.iterator();
inputRecords.forEach(record -> {
- IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
- String outputKey = (String) envelope.getKey();
- KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
- Assert.assertEquals(outputKey, record.getPartitionKey());
- Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
- Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
- record.getApproximateArrivalTimestamp());
- Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
- ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
- record.getData().rewind();
- Assert.assertEquals(outputData, record.getData());
- verifyOffset(envelope.getOffset(), record, shardId);
- });
+ IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
+ String outputKey = (String) envelope.getKey();
+ KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+ Assert.assertEquals(outputKey, record.getPartitionKey());
+ Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
+ Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
+ record.getApproximateArrivalTimestamp());
+ Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
+ ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
+ record.getData().rewind();
+ Assert.assertEquals(outputData, record.getData());
+ verifyOffset(envelope.getOffset(), record, shardId);
+ });
}
private void verifyOffset(String offset, Record inputRecord, String shardId) {
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 7fc1423..08e8124 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -485,12 +485,12 @@
// Schedule a task to renew the lease after a fixed time interval
LOG.info("Starting scheduler to keep renewing lease held by the leader.");
renewLease = new RenewLeaseScheduler((errorMsg) -> {
- LOG.error(errorMsg);
- table.updateIsLeader(currentJMVersion.get(), processorId, false);
- azureLeaderElector.resignLeadership();
- renewLease.shutdown();
- liveness.shutdown();
- }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
+ LOG.error(errorMsg);
+ table.updateIsLeader(currentJMVersion.get(), processorId, false);
+ azureLeaderElector.resignLeadership();
+ renewLease.shutdown();
+ liveness.shutdown();
+ }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
renewLease.scheduleTask();
doOnProcessorChange(new ArrayList<>());
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
index 2abb380..68d272a 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
@@ -60,14 +60,14 @@
@Override
public ScheduledFuture scheduleTask() {
return scheduler.scheduleWithFixedDelay(() -> {
- try {
- String currJVM = currentJMVersion.get();
- LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
- table.updateHeartbeat(currJVM, processorId);
- } catch (Exception e) {
- errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
- }
- }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
+ try {
+ String currJVM = currentJMVersion.get();
+ LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
+ table.updateHeartbeat(currJVM, processorId);
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
+ }
+ }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
index 235b1f8..03260a3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -67,24 +67,24 @@
@Override
public ScheduledFuture scheduleTask() {
return scheduler.scheduleWithFixedDelay(() -> {
- try {
- LOG.info("Checking for job model version upgrade");
- // Read job model version from the blob.
- String blobJMV = blob.getJobModelVersion();
- LOG.info("Job Model Version seen on the blob: {}", blobJMV);
- String blobBarrierState = blob.getBarrierState();
- String currentJMV = currentJMVersion.get();
- LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
- String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
- List<String> processorList = blob.getLiveProcessorList();
- // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
- if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
- listener.onStateChange();
- }
- } catch (Exception e) {
- errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+ try {
+ LOG.info("Checking for job model version upgrade");
+ // Read job model version from the blob.
+ String blobJMV = blob.getJobModelVersion();
+ LOG.info("Job Model Version seen on the blob: {}", blobJMV);
+ String blobBarrierState = blob.getBarrierState();
+ String currentJMV = currentJMVersion.get();
+ LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
+ String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
+ List<String> processorList = blob.getLiveProcessorList();
+ // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
+ if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
+ listener.onStateChange();
}
- }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+ }
+ }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
index 7386fa9..f7d4d93 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
@@ -77,32 +77,32 @@
@Override
public ScheduledFuture scheduleTask() {
return scheduler.scheduleWithFixedDelay(() -> {
- try {
- if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
- LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+ try {
+ if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+ LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+ barrierTimeout.getAndSet(true);
+ listener.onStateChange();
+ } else {
+ LOG.info("Leader checking for barrier state");
+ // Get processor IDs listed in the table that have the new job model verion.
+ Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
+ Set<String> tableProcessors = new HashSet<>();
+ for (ProcessorEntity entity : tableList) {
+ tableProcessors.add(entity.getRowKey());
+ }
+ LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
+ LOG.info("List of live processors as seen in the table = {}", tableProcessors);
+ if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
barrierTimeout.getAndSet(true);
listener.onStateChange();
- } else {
- LOG.info("Leader checking for barrier state");
- // Get processor IDs listed in the table that have the new job model verion.
- Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
- Set<String> tableProcessors = new HashSet<>();
- for (ProcessorEntity entity : tableList) {
- tableProcessors.add(entity.getRowKey());
- }
- LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
- LOG.info("List of live processors as seen in the table = {}", tableProcessors);
- if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
- barrierTimeout.getAndSet(true);
- listener.onStateChange();
- } else if (blobProcessorSet.equals(tableProcessors)) {
- listener.onStateChange();
- }
+ } else if (blobProcessorSet.equals(tableProcessors)) {
+ listener.onStateChange();
}
- } catch (Exception e) {
- errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
}
- }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
+ }
+ }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
index 59a8123..c3502a3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
@@ -64,15 +64,15 @@
@Override
public ScheduledFuture scheduleTask() {
return scheduler.scheduleWithFixedDelay(() -> {
- try {
- LOG.info("Checking for leader liveness");
- if (!checkIfLeaderAlive()) {
- listener.onStateChange();
- }
- } catch (Exception e) {
- errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+ try {
+ LOG.info("Checking for leader liveness");
+ if (!checkIfLeaderAlive()) {
+ listener.onStateChange();
}
- }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+ }
+ }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
index d4715f3..2ac52f3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
@@ -69,26 +69,26 @@
@Override
public ScheduledFuture scheduleTask() {
return scheduler.scheduleWithFixedDelay(() -> {
- try {
- if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
- LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
- scheduler.shutdownNow();
- return;
- }
- LOG.info("Checking for list of live processors");
- //Get the list of live processors published on the blob.
- Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
- //Get the list of live processors from the table. This is the current system state.
- Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
- //Invoke listener if the table list is not consistent with the blob list.
- if (!liveProcessors.equals(currProcessors)) {
- liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
- listener.onStateChange();
- }
- } catch (Exception e) {
- errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+ try {
+ if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+ LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
+ scheduler.shutdownNow();
+ return;
}
- }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+ LOG.info("Checking for list of live processors");
+ //Get the list of live processors published on the blob.
+ Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
+ //Get the list of live processors from the table. This is the current system state.
+ Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
+ //Invoke listener if the table list is not consistent with the blob list.
+ if (!liveProcessors.equals(currProcessors)) {
+ liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
+ listener.onStateChange();
+ }
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+ }
+ }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
index f158122..865e6e7 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
@@ -56,16 +56,16 @@
@Override
public ScheduledFuture scheduleTask() {
return scheduler.scheduleWithFixedDelay(() -> {
- try {
- LOG.info("Renewing lease");
- boolean status = leaseBlobManager.renewLease(leaseId.get());
- if (!status) {
- errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
- }
- } catch (Exception e) {
- errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+ try {
+ LOG.info("Renewing lease");
+ boolean status = leaseBlobManager.renewLease(leaseId.get());
+ if (!status) {
+ errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
}
- }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+ }
+ }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 32a8532..85c2b33 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -213,13 +213,13 @@
throw new IllegalStateException("Attempting to close an already closed AzureBlobAvroWriter");
}
allBlobWriterComponents.forEach(blobWriterComponents -> {
- try {
- closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream,
- blobWriterComponents.blockBlobAsyncClient);
- } catch (IOException e) {
- throw new SamzaException(e);
- }
- });
+ try {
+ closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream,
+ blobWriterComponents.blockBlobAsyncClient);
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ });
isClosed = true;
}
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index d787351..e615808 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -351,15 +351,15 @@
pendingUpload.add(future);
future.handle((aVoid, throwable) -> {
- if (throwable == null) {
- LOG.info("Upload block for blob: {} with blockid: {} finished.", blobAsyncClient.getBlobUrl().toString(), blockId);
- pendingUpload.remove(future);
- return aVoid;
- } else {
- throw new AzureException("Blob upload failed for blob " + blobAsyncClient.getBlobUrl().toString()
- + " and block with id: " + blockId, throwable);
- }
- });
+ if (throwable == null) {
+ LOG.info("Upload block for blob: {} with blockid: {} finished.", blobAsyncClient.getBlobUrl().toString(), blockId);
+ pendingUpload.remove(future);
+ return aVoid;
+ } else {
+ throw new AzureException("Blob upload failed for blob " + blobAsyncClient.getBlobUrl().toString()
+ + " and block with id: " + blockId, throwable);
+ }
+ });
blockNum += 1;
if (blockNum >= MAX_BLOCKS_IN_AZURE_BLOB) {
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index d8205a5..5ecd528 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -471,41 +471,41 @@
private void flushWriters(Map<String, AzureBlobWriter> sourceWriterMap) {
sourceWriterMap.forEach((stream, writer) -> {
- try {
- LOG.info("Flushing topic:{}", stream);
- writer.flush();
- } catch (IOException e) {
- throw new SystemProducerException("Close failed for topic " + stream, e);
- }
- });
+ try {
+ LOG.info("Flushing topic:{}", stream);
+ writer.flush();
+ } catch (IOException e) {
+ throw new SystemProducerException("Close failed for topic " + stream, e);
+ }
+ });
}
private void closeWriters(String source, Map<String, AzureBlobWriter> sourceWriterMap) throws Exception {
Set<CompletableFuture<Void>> pendingClose = ConcurrentHashMap.newKeySet();
try {
sourceWriterMap.forEach((stream, writer) -> {
- LOG.info("Closing topic:{}", stream);
- CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
- @Override
- public void run() {
- try {
- writer.close();
- } catch (IOException e) {
- throw new SystemProducerException("Close failed for topic " + stream, e);
- }
+ LOG.info("Closing topic:{}", stream);
+ CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ throw new SystemProducerException("Close failed for topic " + stream, e);
}
- }, asyncBlobThreadPool);
- pendingClose.add(future);
- future.handle((aVoid, throwable) -> {
- sourceWriterMap.remove(writer);
- if (throwable != null) {
- throw new SystemProducerException("Close failed for topic " + stream, throwable);
- } else {
- LOG.info("Blob close finished for stream " + stream);
- return aVoid;
- }
- });
+ }
+ }, asyncBlobThreadPool);
+ pendingClose.add(future);
+ future.handle((aVoid, throwable) -> {
+ sourceWriterMap.remove(writer);
+ if (throwable != null) {
+ throw new SystemProducerException("Close failed for topic " + stream, throwable);
+ } else {
+ LOG.info("Blob close finished for stream " + stream);
+ return aVoid;
+ }
});
+ });
CompletableFuture<Void> future = CompletableFuture.allOf(pendingClose.toArray(new CompletableFuture[0]));
LOG.info("Flush source: {} has pending closes: {} ", source, pendingClose.size());
future.get((long) closeTimeout, TimeUnit.MILLISECONDS);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 69a921e..e7e82ae 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -110,10 +110,10 @@
LOG.info("Building mappings from physicalName to streamId");
streamConfig.getStreamIds()
.forEach((streamId) -> {
- String physicalName = streamConfig.getPhysicalName(streamId);
- LOG.info("Obtained physicalName: {} for streamId: {} ", physicalName, streamId);
- physcialToId.put(physicalName, streamId);
- });
+ String physicalName = streamConfig.getPhysicalName(streamId);
+ LOG.info("Obtained physicalName: {} for streamId: {} ", physicalName, streamId);
+ physcialToId.put(physicalName, streamId);
+ });
}
private String getFromStreamIdOrName(String configName, String streamName, String defaultString) {
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 2d22929..2103b1c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -187,15 +187,15 @@
.getPartitionRuntimeInformation(partition);
futureList.add(partitionRuntimeInfo);
partitionRuntimeInfo.thenAccept(ehPartitionInfo -> {
- LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
- // Set offsets
- String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
- String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
- String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
- SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
- upcomingOffset);
- sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
- });
+ LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
+ // Set offsets
+ String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
+ String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
+ String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
+ SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
+ upcomingOffset);
+ sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
+ });
}
CompletableFuture<Void> futureGroup =
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 2f2873e..a6d975f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -467,29 +467,29 @@
public void onReceive(Iterable<EventData> events) {
if (events != null) {
events.forEach(event -> {
- byte[] eventDataBody = event.getBytes();
- if (interceptor != null) {
- eventDataBody = interceptor.intercept(eventDataBody);
- }
- String offset = event.getSystemProperties().getOffset();
- Object partitionKey = event.getSystemProperties().getPartitionKey();
- if (partitionKey == null) {
- partitionKey = event.getProperties().get(EventHubSystemProducer.KEY);
- }
- try {
- updateMetrics(event);
+ byte[] eventDataBody = event.getBytes();
+ if (interceptor != null) {
+ eventDataBody = interceptor.intercept(eventDataBody);
+ }
+ String offset = event.getSystemProperties().getOffset();
+ Object partitionKey = event.getSystemProperties().getPartitionKey();
+ if (partitionKey == null) {
+ partitionKey = event.getProperties().get(EventHubSystemProducer.KEY);
+ }
+ try {
+ updateMetrics(event);
- // note that the partition key can be null
- put(ssp, new EventHubIncomingMessageEnvelope(ssp, offset, partitionKey, eventDataBody, event));
- } catch (InterruptedException e) {
- String msg = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", ssp);
- LOG.error(msg, e);
- throw new SamzaException(msg, e);
- }
+ // note that the partition key can be null
+ put(ssp, new EventHubIncomingMessageEnvelope(ssp, offset, partitionKey, eventDataBody, event));
+ } catch (InterruptedException e) {
+ String msg = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", ssp);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
- // Cache latest checkpoint
- streamPartitionOffsets.put(ssp, offset);
- });
+ // Cache latest checkpoint
+ streamPartitionOffsets.put(ssp, offset);
+ });
}
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
index 83d51ed..7b6a82e 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
@@ -140,25 +140,25 @@
// Auto update the metrics and possible throwable when futures are complete.
sendResult.handle((aVoid, throwable) -> {
- long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
- sendCallbackLatency.get(streamId).update(callbackLatencyMs);
- aggSendCallbackLatency.update(callbackLatencyMs);
- if (throwable != null) {
- sendErrors.get(streamId).inc();
- aggSendErrors.inc();
- LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
- sendExceptionOnCallback.compareAndSet(null, throwable);
- }
- return aVoid;
- });
+ long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
+ sendCallbackLatency.get(streamId).update(callbackLatencyMs);
+ aggSendCallbackLatency.update(callbackLatencyMs);
+ if (throwable != null) {
+ sendErrors.get(streamId).inc();
+ aggSendErrors.inc();
+ LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
+ sendExceptionOnCallback.compareAndSet(null, throwable);
+ }
+ return aVoid;
+ });
}
public void start() {
streamIds.forEach(streamId -> {
- sendCallbackLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_CALLBACK_LATENCY));
- sendLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_LATENCY));
- sendErrors.put(streamId, metricsRegistry.newCounter(streamId, SEND_ERRORS));
- });
+ sendCallbackLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_CALLBACK_LATENCY));
+ sendLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_LATENCY));
+ sendErrors.put(streamId, metricsRegistry.newCounter(streamId, SEND_ERRORS));
+ });
if (aggSendLatency == null) {
aggSendLatency = new SamzaHistogram(metricsRegistry, AGGREGATE, SEND_LATENCY);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index f021f36..3912bcf 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -152,32 +152,32 @@
if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
// Create all partition senders
perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
- EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+ EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
- try {
- Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
- long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
- Integer numPartitions =
- ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
+ try {
+ Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+ long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+ Integer numPartitions =
+ ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
- for (int i = 0; i < numPartitions; i++) {
- String partitionId = String.valueOf(i);
- EventHubClientManager perPartitionClientManager =
- createOrGetEventHubClientManagerForPartition(streamId, i);
- PartitionSender partitionSender =
- perPartitionClientManager.getEventHubClient().createPartitionSender(partitionId).get(DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- partitionSenders.put(i, partitionSender);
- }
-
- streamPartitionSenders.put(streamId, partitionSenders);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
- throw new SamzaException(msg, e);
- } catch (EventHubException | IllegalArgumentException e) {
- String msg = "Creation of partition sender failed with exception";
- throw new SamzaException(msg, e);
+ for (int i = 0; i < numPartitions; i++) {
+ String partitionId = String.valueOf(i);
+ EventHubClientManager perPartitionClientManager =
+ createOrGetEventHubClientManagerForPartition(streamId, i);
+ PartitionSender partitionSender =
+ perPartitionClientManager.getEventHubClient().createPartitionSender(partitionId).get(DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ partitionSenders.put(i, partitionSender);
}
- });
+
+ streamPartitionSenders.put(streamId, partitionSenders);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
+ throw new SamzaException(msg, e);
+ } catch (EventHubException | IllegalArgumentException e) {
+ String msg = "Creation of partition sender failed with exception";
+ throw new SamzaException(msg, e);
+ }
+ });
}
isInitialized = true;
LOG.info("EventHubSystemProducer initialized.");
@@ -227,10 +227,10 @@
// Initiate metrics
streamIds.forEach((streamId) -> {
- eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
- eventWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_WRITE_RATE));
- eventByteWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_BYTE_WRITE_RATE));
- });
+ eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
+ eventWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_WRITE_RATE));
+ eventByteWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_BYTE_WRITE_RATE));
+ });
// Locking to ensure that these aggregated metrics will be created only once across multiple system producers.
synchronized (AGGREGATE_METRICS_LOCK) {
@@ -365,15 +365,15 @@
public synchronized void stop() {
LOG.info("Stopping producer.");
streamPartitionSenders.values().forEach((streamPartitionSender) -> {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
- CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
- try {
- future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (ExecutionException | InterruptedException | TimeoutException e) {
- LOG.error("Closing the partition sender failed ", e);
- }
- });
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
+ CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+ try {
+ future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ LOG.error("Closing the partition sender failed ", e);
+ }
+ });
perStreamEventHubClientManagers.values()
.parallelStream()
.forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 68544db..51d2cdc 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -294,8 +294,8 @@
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
verify(mockContainerClient, times(2)).getBlobAsyncClient(argument.capture());
argument.getAllValues().forEach(blobName -> {
- Assert.assertTrue(blobName.contains(blobUrlPrefix));
- });
+ Assert.assertTrue(blobName.contains(blobUrlPrefix));
+ });
List<String> allBlobNames = argument.getAllValues();
Assert.assertNotEquals(allBlobNames.get(0), allBlobNames.get(1));
@@ -359,8 +359,8 @@
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
verify(mockContainerClient, times(2)).getBlobAsyncClient(argument.capture());
argument.getAllValues().forEach(blobName -> {
- Assert.assertTrue(blobName.contains(blobUrlPrefix));
- });
+ Assert.assertTrue(blobName.contains(blobUrlPrefix));
+ });
List<String> allBlobNames = argument.getAllValues();
Assert.assertNotEquals(allBlobNames.get(0), allBlobNames.get(1));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index 1614384..b713ec7 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -105,16 +105,16 @@
BlobMetadataGenerator mockBlobMetadataGenerator = mock(BlobMetadataGenerator.class);
doAnswer(invocation -> {
- BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
- String streamName = blobMetadataContext.getStreamName();
- Long blobSize = blobMetadataContext.getBlobSize();
- Long numberOfRecords = blobMetadataContext.getNumberOfMessagesInBlob();
- Map<String, String> metadataProperties = new HashMap<>();
- metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
- metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
- metadataProperties.put(BLOB_RECORD_NUMBER_METADATA, Long.toString(numberOfRecords));
- return metadataProperties;
- }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
+ BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
+ String streamName = blobMetadataContext.getStreamName();
+ Long blobSize = blobMetadataContext.getBlobSize();
+ Long numberOfRecords = blobMetadataContext.getNumberOfMessagesInBlob();
+ Map<String, String> metadataProperties = new HashMap<>();
+ metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
+ metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
+ metadataProperties.put(BLOB_RECORD_NUMBER_METADATA, Long.toString(numberOfRecords));
+ return metadataProperties;
+ }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
@@ -196,8 +196,8 @@
verify(azureBlobOutputStream).stageBlock(eq(blockIdEncoded(1)), argument.capture(), eq((int) fullBlockCompressedByte.length));
verify(azureBlobOutputStream).stageBlock(eq(blockIdEncoded(2)), argument2.capture(), eq((int) halfBlockCompressedByte.length));
argument.getAllValues().forEach(byteBuffer -> {
- Assert.assertEquals(ByteBuffer.wrap(fullBlockCompressedByte), byteBuffer);
- });
+ Assert.assertEquals(ByteBuffer.wrap(fullBlockCompressedByte), byteBuffer);
+ });
Assert.assertEquals(ByteBuffer.wrap(halfBlockCompressedByte), argument2.getAllValues().get(0));
verify(mockMetrics, times(3)).updateAzureUploadMetrics();
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
index 4ad22ae..acf4cfb 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
@@ -596,11 +596,11 @@
private void setupWriterForProducer(AzureBlobSystemProducer azureBlobSystemProducer,
AzureBlobWriter mockAzureBlobWriter, String stream) {
doAnswer(invocation -> {
- String blobUrl = invocation.getArgumentAt(0, String.class);
- String streamName = invocation.getArgumentAt(2, String.class);
- Assert.assertEquals(stream, streamName);
- Assert.assertEquals(stream, blobUrl);
- return mockAzureBlobWriter;
- }).when(azureBlobSystemProducer).createNewWriter(anyString(), any(), anyString());
+ String blobUrl = invocation.getArgumentAt(0, String.class);
+ String streamName = invocation.getArgumentAt(2, String.class);
+ Assert.assertEquals(stream, streamName);
+ Assert.assertEquals(stream, blobUrl);
+ return mockAzureBlobWriter;
+ }).when(azureBlobSystemProducer).createNewWriter(anyString(), any(), anyString());
}
}
\ No newline at end of file
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 00bffc3..2d07151 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -98,12 +98,12 @@
// Consumer mocks
PartitionReceiver mockPartitionReceiver = PowerMockito.mock(PartitionReceiver.class);
PowerMockito.when(mockPartitionReceiver.setReceiveHandler(any())).then((Answer<Void>) invocationOnMock -> {
- PartitionReceiveHandler handler = invocationOnMock.getArgumentAt(0, PartitionReceiveHandler.class);
- if (handler == null) {
- Assert.fail("Handler for setReceiverHandler was null");
- }
- return null;
- });
+ PartitionReceiveHandler handler = invocationOnMock.getArgumentAt(0, PartitionReceiveHandler.class);
+ if (handler == null) {
+ Assert.fail("Handler for setReceiverHandler was null");
+ }
+ return null;
+ });
PartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(PartitionRuntimeInformation.class);
PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset())
.thenReturn(EventHubSystemConsumer.START_OF_STREAM);
@@ -114,16 +114,16 @@
PartitionSender mockPartitionSender1 = PowerMockito.mock(PartitionSender.class);
PowerMockito.when(mockPartitionSender0.send(any(EventData.class)))
.then((Answer<CompletableFuture<Void>>) invocationOnMock -> {
- EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
- receivedData.get(systemName).get(streamName).get(0).add(data);
- return new CompletableFuture<>();
- });
+ EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
+ receivedData.get(systemName).get(streamName).get(0).add(data);
+ return new CompletableFuture<>();
+ });
PowerMockito.when(mockPartitionSender1.send(any(EventData.class)))
.then((Answer<CompletableFuture<Void>>) invocationOnMock -> {
- EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
- receivedData.get(systemName).get(streamName).get(1).add(data);
- return new CompletableFuture<>();
- });
+ EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
+ receivedData.get(systemName).get(streamName).get(1).add(data);
+ return new CompletableFuture<>();
+ });
EventHubRuntimeInformation mockRuntimeInfo = PowerMockito.mock(EventHubRuntimeInformation.class);
CompletableFuture<EventHubRuntimeInformation> future = new MockFuture(mockRuntimeInfo);
@@ -133,18 +133,18 @@
// Consumer calls
PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
.then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
- String partitionId = invocationOnMock.getArgumentAt(1, String.class);
- startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
- return CompletableFuture.completedFuture(mockPartitionReceiver);
- });
+ String partitionId = invocationOnMock.getArgumentAt(1, String.class);
+ startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
+ return CompletableFuture.completedFuture(mockPartitionReceiver);
+ });
PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
.then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
- String partitionId = invocationOnMock.getArgumentAt(1, String.class);
- EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
- startingOffsets.put(partitionId, offset);
- return CompletableFuture.completedFuture(mockPartitionReceiver);
- });
+ String partitionId = invocationOnMock.getArgumentAt(1, String.class);
+ EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
+ startingOffsets.put(partitionId, offset);
+ return CompletableFuture.completedFuture(mockPartitionReceiver);
+ });
PowerMockito.when(mockEventHubClient.getPartitionRuntimeInformation(anyString())).thenReturn(partitionFuture);
@@ -156,12 +156,12 @@
PowerMockito.when(mockEventHubClient.send(any(EventData.class), anyString()))
.then((Answer<CompletableFuture<Void>>) invocationOnMock -> {
- EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
- String key = invocationOnMock.getArgumentAt(1, String.class);
- Integer intKey = Integer.valueOf(key);
- receivedData.get(systemName).get(streamName).get(intKey % 2).add(data);
- return new CompletableFuture<>();
- });
+ EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
+ String key = invocationOnMock.getArgumentAt(1, String.class);
+ Integer intKey = Integer.valueOf(key);
+ receivedData.get(systemName).get(streamName).get(intKey % 2).add(data);
+ return new CompletableFuture<>();
+ });
} catch (Exception e) {
Assert.fail("Failed to create create mock methods for EventHubClient");
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index befbf3a..f0b3a9c 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -86,11 +86,11 @@
Assert.assertTrue(partitionMetadataMap.size() >= MIN_EVENTHUB_ENTITY_PARTITION);
Assert.assertTrue(partitionMetadataMap.size() <= MAX_EVENTHUB_ENTITY_PARTITION);
partitionMetadataMap.forEach((partition, metadata) -> {
- Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, metadata.getOldestOffset());
- Assert.assertNotSame(EventHubSystemConsumer.END_OF_STREAM, metadata.getNewestOffset());
- String expectedUpcomingOffset = String.valueOf(Long.parseLong(metadata.getNewestOffset()) + 1);
- Assert.assertEquals(expectedUpcomingOffset, metadata.getUpcomingOffset());
- });
+ Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, metadata.getOldestOffset());
+ Assert.assertNotSame(EventHubSystemConsumer.END_OF_STREAM, metadata.getNewestOffset());
+ String expectedUpcomingOffset = String.valueOf(Long.parseLong(metadata.getNewestOffset()) + 1);
+ Assert.assertEquals(expectedUpcomingOffset, metadata.getUpcomingOffset());
+ });
}
}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index 683cb52..383ae42 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -86,8 +86,8 @@
@Override
public void putAll(List<Entry<byte[], byte[]>> entries) {
entries.forEach(entry -> {
- validateMessageSize(entry.getValue());
- });
+ validateMessageSize(entry.getValue());
+ });
List<Entry<byte[], byte[]>> largeMessageSafeEntries = removeLargeMessages(entries);
store.putAll(largeMessageSafeEntries);
}
@@ -146,14 +146,14 @@
private List<Entry<byte[], byte[]>> removeLargeMessages(List<Entry<byte[], byte[]>> entries) {
List<Entry<byte[], byte[]>> largeMessageSafeEntries = new ArrayList<>();
entries.forEach(entry -> {
- if (!isLargeMessage(entry.getValue())) {
- largeMessageSafeEntries.add(entry);
- } else {
- LOG.info("Ignoring a large message with size " + entry.getValue().length + " since it is greater than "
- + "the maximum allowed value of " + maxMessageSize);
- largeMessageSafeStoreMetrics.ignoredLargeMessages().inc();
- }
- });
+ if (!isLargeMessage(entry.getValue())) {
+ largeMessageSafeEntries.add(entry);
+ } else {
+ LOG.info("Ignoring a large message with size " + entry.getValue().length + " since it is greater than "
+ + "the maximum allowed value of " + maxMessageSize);
+ largeMessageSafeStoreMetrics.ignoredLargeMessages().inc();
+ }
+ });
return largeMessageSafeEntries;
}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
index 89c2794..c94b3f1 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
@@ -118,12 +118,12 @@
List<Entry<K, V>> toPut = new LinkedList<>();
List<K> toDelete = new LinkedList<>();
entries.forEach(e -> {
- if (e.getValue() != null) {
- toPut.add(e);
- } else {
- toDelete.add(e.getKey());
- }
- });
+ if (e.getValue() != null) {
+ toPut.add(e);
+ } else {
+ toDelete.add(e.getKey());
+ }
+ });
if (!toPut.isEmpty()) {
instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 3d3c39b..e5c1e97 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -190,11 +190,11 @@
// Set up latch
final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
MockSystemProducer.listeners.add((source, envelope) -> {
- allMessagesSent.countDown();
- if (allMessagesSent.getCount() == messages.size() - 1) {
- throw new RuntimeException(); // Throw on the first message
- }
- });
+ allMessagesSent.countDown();
+ if (allMessagesSent.getCount() == messages.size() - 1) {
+ throw new RuntimeException(); // Throw on the first message
+ }
+ });
// Log the messages
messages.forEach((message) -> log.info(message));
@@ -227,13 +227,13 @@
final CountDownLatch allMessagesSent = new CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra messages
final CountDownLatch waitForTimeout = new CountDownLatch(1);
MockSystemProducer.listeners.add((source, envelope) -> {
- allMessagesSent.countDown();
- try {
- waitForTimeout.await();
- } catch (InterruptedException e) {
- fail("Test could not run properly because of a thread interrupt.");
- }
- });
+ allMessagesSent.countDown();
+ try {
+ waitForTimeout.await();
+ } catch (InterruptedException e) {
+ fail("Test could not run properly because of a thread interrupt.");
+ }
+ });
// Log the messages. This is where the timeout will happen!
messages.forEach((message) -> log.info(message));
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 280f54e..0248343 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -204,11 +204,11 @@
// Set up latch
final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
MockSystemProducer.listeners.add((source, envelope) -> {
- allMessagesSent.countDown();
- if (allMessagesSent.getCount() == messages.size() - 1) {
- throw new RuntimeException(); // Throw on the first message
- }
- });
+ allMessagesSent.countDown();
+ if (allMessagesSent.getCount() == messages.size() - 1) {
+ throw new RuntimeException(); // Throw on the first message
+ }
+ });
// Log the messages
messages.forEach((message) -> log.info(message));
@@ -241,13 +241,13 @@
final CountDownLatch allMessagesSent = new CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra messages
final CountDownLatch waitForTimeout = new CountDownLatch(1);
MockSystemProducer.listeners.add((source, envelope) -> {
- allMessagesSent.countDown();
- try {
- waitForTimeout.await();
- } catch (InterruptedException e) {
- fail("Test could not run properly because of a thread interrupt.");
- }
- });
+ allMessagesSent.countDown();
+ try {
+ waitForTimeout.await();
+ } catch (InterruptedException e) {
+ fail("Test could not run properly because of a thread interrupt.");
+ }
+ });
// Log the messages. This is where the timeout will happen!
messages.forEach((message) -> log.info(message));
diff --git a/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java
index 9ec1dca..349bb20 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java
@@ -108,18 +108,18 @@
static CompletionStage<Member> decorateMember(int memberId) {
return CompletableFuture.supplyAsync(() -> {
- /*
- * Introduce some lag to mimic remote call. In real use cases, this typically translates to over the wire
- * network call to some rest service.
- */
- try {
- Thread.sleep((long) (Math.random() * 10000));
- } catch (InterruptedException ec) {
- System.out.println("Interrupted during sleep");
- }
+ /*
+ * Introduce some lag to mimic remote call. In real use cases, this typically translates to over the wire
+ * network call to some rest service.
+ */
+ try {
+ Thread.sleep((long) (Math.random() * 10000));
+ } catch (InterruptedException ec) {
+ System.out.println("Interrupted during sleep");
+ }
- return new Member(memberId, getRandomGender(), getRandomCountry());
- });
+ return new Member(memberId, getRandomGender(), getRandomCountry());
+ });
}
static String getRandomGender() {
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index e2db379..fb65eea 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -320,10 +320,10 @@
SystemConsumer consumer = factory.getConsumer(systemName, config, null);
String name = (String) outputDescriptor.getPhysicalName().orElse(streamId);
metadata.get(name).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
- SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
- ssps.add(temp);
- consumer.register(temp, "0");
- });
+ SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
+ ssps.add(temp);
+ consumer.register(temp, "0");
+ });
long t = System.currentTimeMillis();
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>();
@@ -361,7 +361,7 @@
return output.entrySet()
.stream()
.collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
- entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList())));
+ entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList())));
}
/**
@@ -395,18 +395,18 @@
InMemorySystemProducer producer = (InMemorySystemProducer) factory.getProducer(systemName, config, null);
SystemStream sysStream = new SystemStream(systemName, streamName);
partitionData.forEach((partitionId, partition) -> {
- partition.forEach(e -> {
- Object key = e instanceof KV ? ((KV) e).getKey() : null;
- Object value = e instanceof KV ? ((KV) e).getValue() : e;
- if (value instanceof IncomingMessageEnvelope) {
- producer.send((IncomingMessageEnvelope) value);
- } else {
- producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
- }
- });
- producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
- new EndOfStreamMessage(null)));
+ partition.forEach(e -> {
+ Object key = e instanceof KV ? ((KV) e).getKey() : null;
+ Object value = e instanceof KV ? ((KV) e).getValue() : e;
+ if (value instanceof IncomingMessageEnvelope) {
+ producer.send((IncomingMessageEnvelope) value);
+ } else {
+ producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+ }
});
+ producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
+ new EndOfStreamMessage(null)));
+ });
}
private void deleteStoreDirectories() {
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
index f7545dc..2a579bf 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
@@ -105,26 +105,26 @@
}
metricsRegistry.getGroups().forEach(group -> metricsRegistry.getGroup(group.toString()).forEach((name, metric) -> {
- if (names.contains(name)) {
- metric.visit(new MetricsVisitor() {
- @Override
- public void counter(Counter counter) {
+ if (names.contains(name)) {
+ metric.visit(new MetricsVisitor() {
+ @Override
+ public void counter(Counter counter) {
- }
+ }
- @Override
- public <T> void gauge(Gauge<T> gauge) {
- Double num = (Double) gauge.getValue();
- Assert.assertNotEquals(0D, (Double) gauge.getValue(), 0.0001);
- }
+ @Override
+ public <T> void gauge(Gauge<T> gauge) {
+ Double num = (Double) gauge.getValue();
+ Assert.assertNotEquals(0D, (Double) gauge.getValue(), 0.0001);
+ }
- @Override
- public void timer(Timer timer) {
+ @Override
+ public void timer(Timer timer) {
- }
- });
- }
- }));
+ }
+ });
+ }
+ }));
}
private String getRandomString() {
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 53bc234..b0630b2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -105,8 +105,8 @@
.map(KV::getValue)
.partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.sink((m, collector, coordinator) -> {
- received.add(m.getValue());
- });
+ received.add(m.getValue());
+ });
}
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 4691e87..25d31ea 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -157,8 +157,8 @@
.map(KV::getValue)
.partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.sink((m, collector, coordinator) -> {
- received.add(m.getValue());
- });
+ received.add(m.getValue());
+ });
}
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 5e25817..6afc77c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -167,8 +167,8 @@
.map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
.sendTo(table)
.sink((kv, collector, coordinator) -> {
- LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
- });
+ LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
+ });
OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
appDescriptor.getInputStream(pageViewISD)
@@ -177,8 +177,8 @@
.sendTo(outputStream)
.map(TestTableData.EnrichedPageView::getPageKey)
.sink((joinPageKey, collector, coordinator) -> {
- collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
- });
+ collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
+ });
}
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 24726f8..dbdacf1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -94,11 +94,11 @@
new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
.sink((message, messageCollector, taskCoordinator) -> {
- taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
- messageCollector.send(
- new OutgoingMessageEnvelope(
- new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
- });
+ taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
+ messageCollector.send(
+ new OutgoingMessageEnvelope(
+ new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
+ });
intermediateStreamIds.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamId());
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
index 275be34..a60aea7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
@@ -152,14 +152,14 @@
private static CompletionStage<Collection<PageView>> filterGuestPageViews(PageView pageView,
Predicate<PageView> shouldFailProcess, Supplier<Long> processJitter) {
CompletableFuture<Collection<PageView>> filteredPageViews = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(processJitter.get());
- } catch (InterruptedException ex) {
- System.out.println("Interrupted during sleep.");
- }
+ try {
+ Thread.sleep(processJitter.get());
+ } catch (InterruptedException ex) {
+ System.out.println("Interrupted during sleep.");
+ }
- return Long.valueOf(pageView.getUserId()) < 1 ? Collections.emptyList() : Collections.singleton(pageView);
- });
+ return Long.valueOf(pageView.getUserId()) < 1 ? Collections.emptyList() : Collections.singleton(pageView);
+ });
if (shouldFailProcess.test(pageView)) {
filteredPageViews.completeExceptionally(new RuntimeException("Remote service threw an exception"));
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 0c8cea4..1e5e24c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -278,10 +278,10 @@
doNothing().when(listener).afterStart();
doNothing().when(listener).afterFailure(any());
doAnswer(invocation -> {
- // stopped successfully
- shutdownLatch.countDown();
- return null;
- }).when(listener).afterStop();
+ // stopped successfully
+ shutdownLatch.countDown();
+ return null;
+ }).when(listener).afterStop();
}
private void initProducer(String bootstrapServer) {
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index a2a1e5c..fef4836 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -236,10 +236,10 @@
config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "false");
}
storeName.ifPresent(s -> {
- config.put(String.format(StorageConfig.FACTORY, s), MockStoreFactory.class.getName());
- config.put(String.format(StorageConfig.KEY_SERDE, s), "string");
- config.put(String.format(StorageConfig.MSG_SERDE, s), "string");
- });
+ config.put(String.format(StorageConfig.FACTORY, s), MockStoreFactory.class.getName());
+ config.put(String.format(StorageConfig.KEY_SERDE, s), "string");
+ config.put(String.format(StorageConfig.MSG_SERDE, s), "string");
+ });
Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder().putAll(config).build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
@@ -274,11 +274,11 @@
final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
- // When appRunner2 with id: PROCESSOR_IDS[1] is registered, run processing message in appRunner1.
- if (currentChilds.contains(PROCESSOR_IDS[1])) {
- secondProcessorRegistered.countDown();
- }
- });
+ // When appRunner2 with id: PROCESSOR_IDS[1] is registered, run processing message in appRunner1.
+ if (currentChilds.contains(PROCESSOR_IDS[1])) {
+ secondProcessorRegistered.countDown();
+ }
+ });
// Set up stream app appRunner2.
CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
@@ -356,11 +356,11 @@
final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
- // When appRunner2 with id: PROCESSOR_IDS[1] is registered, start processing message in appRunner1.
- if (currentChilds.contains(PROCESSOR_IDS[1])) {
- secondProcessorRegistered.countDown();
- }
- });
+ // When appRunner2 with id: PROCESSOR_IDS[1] is registered, start processing message in appRunner1.
+ if (currentChilds.contains(PROCESSOR_IDS[1])) {
+ secondProcessorRegistered.countDown();
+ }
+ });
// Set up appRunner2.
CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
@@ -755,10 +755,10 @@
Map<String, String> configMap = new HashMap<>();
CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde("set-config");
metadataStore.all().forEach((key, value) -> {
- CoordinatorStreamStore.CoordinatorMessageKey coordinatorMessageKey = CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(key);
- String deserializedValue = jsonSerde.fromBytes(value);
- configMap.put(coordinatorMessageKey.getKey(), deserializedValue);
- });
+ CoordinatorStreamStore.CoordinatorMessageKey coordinatorMessageKey = CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(key);
+ String deserializedValue = jsonSerde.fromBytes(value);
+ configMap.put(coordinatorMessageKey.getKey(), deserializedValue);
+ });
return new MapConfig(configMap);
}
@@ -1279,8 +1279,8 @@
private static List<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
List<SystemStreamPartition> ssps = new ArrayList<>();
jobModel.getContainers().forEach((containerName, containerModel) -> {
- containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));
- });
+ containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));
+ });
return ssps;
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index b70eb2c..8b5d3c5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -864,13 +864,13 @@
List<String> outMessages = TestAvroSystemFactory.messages.stream()
.map(x -> {
- GenericRecord profileAddr = (GenericRecord) ((GenericRecord) x.getMessage()).get("profileAddress");
- GenericRecord streetNum = (GenericRecord) (profileAddr.get("streetnum"));
- return ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
- + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
- ((GenericRecord) x.getMessage()).get("profileName").toString()) + ","
- + profileAddr.get("zip") + "," + streetNum.get("number");
- })
+ GenericRecord profileAddr = (GenericRecord) ((GenericRecord) x.getMessage()).get("profileAddress");
+ GenericRecord streetNum = (GenericRecord) (profileAddr.get("streetnum"));
+ return ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+ ((GenericRecord) x.getMessage()).get("profileName").toString()) + ","
+ + profileAddr.get("zip") + "," + streetNum.get("number");
+ })
.collect(Collectors.toList());
Assert.assertEquals(numMessages, outMessages.size());
List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameAddressJoin(numMessages);
@@ -1185,19 +1185,19 @@
HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>();
TestAvroSystemFactory.messages.stream()
.map(x -> {
- String pageKey = ((GenericRecord) x.getMessage()).get("pageKey").toString();
- String count = ((GenericRecord) x.getMessage()).get("count").toString();
- pageKeyCountListMap.computeIfAbsent(pageKey, k -> new ArrayList<>()).add(count);
- return pageKeyCountListMap;
- });
+ String pageKey = ((GenericRecord) x.getMessage()).get("pageKey").toString();
+ String count = ((GenericRecord) x.getMessage()).get("count").toString();
+ pageKeyCountListMap.computeIfAbsent(pageKey, k -> new ArrayList<>()).add(count);
+ return pageKeyCountListMap;
+ });
HashMap<String, Integer> pageKeyCountMap = new HashMap<>();
pageKeyCountListMap.forEach((key, list) -> {
- // Check that the number of windows per key is non-zero but less than the number of input messages per key.
- Assert.assertTrue(list.size() > 1 && list.size() < numMessages / TestAvroSystemFactory.pageKeys.length);
- // Collapse the count of messages per key
- pageKeyCountMap.put(key, list.stream().mapToInt(Integer::parseInt).sum());
- });
+ // Check that the number of windows per key is non-zero but less than the number of input messages per key.
+ Assert.assertTrue(list.size() > 1 && list.size() < numMessages / TestAvroSystemFactory.pageKeys.length);
+ // Collapse the count of messages per key
+ pageKeyCountMap.put(key, list.stream().mapToInt(Integer::parseInt).sum());
+ });
Set<String> pageKeys = new HashSet<>(Arrays.asList("job", "inbox"));
HashMap<String, Integer> expectedPageKeyCountMap =
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index a55b8c1..78fc7b5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -126,9 +126,9 @@
GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
appDesc.getInputStream(pageViewISD)
.map(pv -> {
- received.add(pv);
- return pv;
- })
+ received.add(pv);
+ return pv;
+ })
.partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sink((m, collector, coordinator) -> joined.add(m));
@@ -188,15 +188,15 @@
profileStream1
.map(m -> {
- sentToProfileTable1.add(m);
- return new KV(m.getMemberId(), m);
- })
+ sentToProfileTable1.add(m);
+ return new KV(m.getMemberId(), m);
+ })
.sendTo(profileTable);
profileStream2
.map(m -> {
- sentToProfileTable2.add(m);
- return new KV(m.getMemberId(), m);
- })
+ sentToProfileTable2.add(m);
+ return new KV(m.getMemberId(), m);
+ })
.sendTo(profileTable);
GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
index eecc6b4..071f65e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -143,10 +143,10 @@
return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
- Profile profile = (Profile) msg.getMessage();
- int key = profile.getMemberId();
- return ImmutableList.of(new Entry<>(key, profile));
- });
+ Profile profile = (Profile) msg.getMessage();
+ int key = profile.getMemberId();
+ return ImmutableList.of(new Entry<>(key, profile));
+ });
}
}
@@ -156,10 +156,10 @@
return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
- TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
- int key = profile.getMemberId();
- return ImmutableList.of(new Entry<>(key, profile));
- });
+ TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
+ int key = profile.getMemberId();
+ return ImmutableList.of(new Entry<>(key, profile));
+ });
}
}
}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
index 6ba28ae..268cd23 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
@@ -61,11 +61,11 @@
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
final AtomicInteger offset = new AtomicInteger(0);
set.forEach(ssp -> {
- List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
- .map(object -> new IncomingMessageEnvelope(ssp, String.valueOf(offset.incrementAndGet()), null, object)).collect(Collectors.toList());
- envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
- envelopeMap.put(ssp, envelopes);
- });
+ List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
+ .map(object -> new IncomingMessageEnvelope(ssp, String.valueOf(offset.incrementAndGet()), null, object)).collect(Collectors.toList());
+ envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
+ envelopeMap.put(ssp, envelopes);
+ });
done = true;
return envelopeMap;
} else {
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
index c735c74..493bce8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
@@ -53,19 +53,19 @@
public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
return streamNames.stream()
.collect(Collectors.toMap(Function.identity(), streamName -> {
- int messageCount = isBootstrapStream(streamName) ? getMessageCount(streamName) : -1;
- String oldestOffset = messageCount < 0 ? null : "0";
- String newestOffset = messageCount < 0 ? null : String.valueOf(messageCount - 1);
- String upcomingOffset = messageCount < 0 ? null : String.valueOf(messageCount);
- Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>();
- int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1);
- for (int i = 0; i < partitionCount; i++) {
- metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(
- oldestOffset, newestOffset, upcomingOffset
- ));
- }
- return new SystemStreamMetadata(streamName, metadataMap);
- }));
+ int messageCount = isBootstrapStream(streamName) ? getMessageCount(streamName) : -1;
+ String oldestOffset = messageCount < 0 ? null : "0";
+ String newestOffset = messageCount < 0 ? null : String.valueOf(messageCount - 1);
+ String upcomingOffset = messageCount < 0 ? null : String.valueOf(messageCount);
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>();
+ int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1);
+ for (int i = 0; i < partitionCount; i++) {
+ metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(
+ oldestOffset, newestOffset, upcomingOffset
+ ));
+ }
+ return new SystemStreamMetadata(streamName, metadataMap);
+ }));
}
@Override