SAMZA-2551: Upgrade all modules to automatically use checkstyle 6.11.2 (part 2: includes all modules with checkstyle currently enabled, excluding samza-core) (#1390)

API/Upgrade/Usage changes: N/A
diff --git a/build.gradle b/build.gradle
index 05f4066..dfbe2e6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -209,8 +209,7 @@
 
   checkstyle {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
-    // temporarily hardcode 6.11.2 until all other modules are upgraded
-    toolVersion = "6.11.2"
+    toolVersion = "$checkstyleVersion"
   }
 
   test {
diff --git a/gradle.properties b/gradle.properties
index dda382c..cb5da70 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -24,4 +24,4 @@
 org.gradle.jvmargs="-XX:MaxPermSize=512m"
 
 systemProp.file.encoding=utf-8
-checkstyleVersion=6.11
+checkstyleVersion=6.11.2
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
index 85835f9..6ac7615 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/SamzaHistogram.java
@@ -48,7 +48,7 @@
     this.gauges = this.percentiles.stream()
         .filter(x -> x > 0 && x <= 100)
         .collect(Collectors.toMap(Function.identity(),
-            x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
+          x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
   }
 
   public void update(long value) {
diff --git a/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java b/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java
index 2c087d2..f6ec8aa 100644
--- a/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java
+++ b/samza-api/src/test/java/org/apache/samza/serializers/TestJsonSerdeV2.java
@@ -37,11 +37,11 @@
     assertEquals(obj, serde.fromBytes(bytes));
     JsonSerdeV2<Map.Entry<String, Object>> serdeHashMapEntry = new JsonSerdeV2<>();
     obj.entrySet().forEach(entry -> {
-        try {
-          serdeHashMapEntry.toBytes(entry);
-        } catch (Exception e) {
-          fail("HashMap Entry serialization failed!");
-        }
-      });
+      try {
+        serdeHashMapEntry.toBytes(entry);
+      } catch (Exception e) {
+        fail("HashMap Entry serialization failed!");
+      }
+    });
   }
 }
diff --git a/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java b/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java
index 6ab1c8b..3f13e44 100644
--- a/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java
+++ b/samza-api/src/test/java/org/apache/samza/system/descriptors/examples/expanding/ExampleExpandingSystemDescriptor.java
@@ -31,10 +31,8 @@
   private static final String FACTORY_CLASS_NAME = "org.apache.samza.GraphExpandingSystemFactory";
 
   public ExampleExpandingSystemDescriptor(String systemName) {
-    super(systemName, FACTORY_CLASS_NAME,
-        (InputTransformer<String>) IncomingMessageEnvelope::toString,
-        (streamGraph, inputDescriptor) -> (MessageStream<Long>) streamGraph.getInputStream(inputDescriptor)
-    );
+    super(systemName, FACTORY_CLASS_NAME, (InputTransformer<String>) IncomingMessageEnvelope::toString,
+      (streamGraph, inputDescriptor) -> (MessageStream<Long>) streamGraph.getInputStream(inputDescriptor));
   }
 
   @Override
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
index f6f11ff..7de447c 100644
--- a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -45,10 +45,10 @@
   public TableRateLimiter<String, String> getThrottler(String tag) {
     TableRateLimiter.CreditFunction<String, String> credFn =
         (TableRateLimiter.CreditFunction<String, String>) (key, value, args) -> {
-      int credits = key == null ? 0 : 3;
-      credits += value == null ? 0 : 3;
-      return credits;
-    };
+          int credits = key == null ? 0 : 3;
+          credits += value == null ? 0 : 3;
+          return credits;
+        };
     RateLimiter rateLimiter = mock(RateLimiter.class);
     doReturn(Collections.singleton(tag)).when(rateLimiter).getSupportedTags();
     TableRateLimiter<String, String> rateLimitHelper = new TableRateLimiter<>("foo", rateLimiter, credFn, tag);
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
index e0c9099..8e3fc7b 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -84,12 +84,12 @@
     // all properties should now start with stream name
     Set<String> streams = new HashSet<>();
     streamsConfig.keySet().forEach(key -> {
-        String[] parts = key.split("\\.", 2);
-        if (parts.length != 2) {
-          throw new IllegalArgumentException("Ill-formatted stream config: " + key);
-        }
-        streams.add(parts[0]);
-      });
+      String[] parts = key.split("\\.", 2);
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Ill-formatted stream config: " + key);
+      }
+      streams.add(parts[0]);
+    });
     return streams;
   }
 
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
index 2758022..60d3b37 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -76,11 +76,11 @@
     // Kinesis streams cannot be configured as bootstrap streams
     KinesisConfig kConfig = new KinesisConfig(config);
     kConfig.getKinesisStreams(system).forEach(stream -> {
-        StreamConfig streamConfig = new StreamConfig(kConfig);
-        SystemStream ss = new SystemStream(system, stream);
-        if (streamConfig.getBootstrapEnabled(ss)) {
-          throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
-        }
-      });
+      StreamConfig streamConfig = new StreamConfig(kConfig);
+      SystemStream ss = new SystemStream(system, stream);
+      if (streamConfig.getBootstrapEnabled(ss)) {
+        throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
+      }
+    });
   }
 }
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
index 80d43de..16cd4d5 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -215,19 +215,19 @@
   public void afterCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
     LOG.info("afterCheckpoint called with sspOffsets {}", sspOffsets);
     sspOffsets.forEach((ssp, offset) -> {
-        KinesisRecordProcessor processor = processors.get(ssp);
-        KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
-        if (processor == null) {
-          LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
-              + " checkpoint {}.", ssp, offset);
-        } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
-          LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
-              + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
-              kinesisOffset.getShardId(), offset);
-        } else {
-          processor.checkpoint(kinesisOffset.getSeqNumber());
-        }
-      });
+      KinesisRecordProcessor processor = processors.get(ssp);
+      KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
+      if (processor == null) {
+        LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
+            + " checkpoint {}.", ssp, offset);
+      } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
+        LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
+            + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
+            kinesisOffset.getShardId(), offset);
+      } else {
+        processor.checkpoint(kinesisOffset.getSeqNumber());
+      }
+    });
   }
 
   @Override
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
index fcce49e..130a189 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
@@ -110,12 +110,11 @@
     String clientConfigPrefix =
         String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
 
-    region.ifPresent(
-        val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
+    region.ifPresent(val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
     accessKey.ifPresent(
-        val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
+      val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
     secretKey.ifPresent(
-        val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
+      val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
     kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
 
     return config;
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
index 678dfe6..f5f27c9 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
@@ -121,12 +121,10 @@
     Map<String, String> config = new HashMap<>(super.toConfig());
     String systemName = getSystemName();
 
-    region.ifPresent(
-        val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
-    proxyHost.ifPresent(
-        val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
+    region.ifPresent(val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
+    proxyHost.ifPresent(val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
     proxyPort.ifPresent(
-        val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
+      val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
 
     String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
     kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
index 2f42981..e3a9c55 100644
--- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
@@ -73,7 +73,7 @@
         .collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
     millisBehindLatest = streamNames.stream()
         .collect(Collectors.toConcurrentMap(Function.identity(),
-            x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
+          x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
 
     // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
     synchronized (LOCK) {
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
index 6f1f052..2551d07 100644
--- a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -233,22 +233,22 @@
       List<KinesisRecordProcessor> processors) {
     Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
     processors.forEach(processor -> {
-        try {
-          // Create records and call process records
-          IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
-          doNothing().when(checkpointer).checkpoint(anyString());
-          doNothing().when(checkpointer).checkpoint();
-          ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
-          when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
-          when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
-          List<Record> inputRecords = createRecords(numRecordsPerShard);
-          processorRecordMap.put(processor, inputRecords);
-          when(processRecordsInput.getRecords()).thenReturn(inputRecords);
-          processor.processRecords(processRecordsInput);
-        } catch (ShutdownException | InvalidStateException ex) {
-          throw new RuntimeException(ex);
-        }
-      });
+      try {
+        // Create records and call process records
+        IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
+        doNothing().when(checkpointer).checkpoint(anyString());
+        doNothing().when(checkpointer).checkpoint();
+        ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
+        when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
+        when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
+        List<Record> inputRecords = createRecords(numRecordsPerShard);
+        processorRecordMap.put(processor, inputRecords);
+        when(processRecordsInput.getRecords()).thenReturn(inputRecords);
+        processor.processRecords(processRecordsInput);
+      } catch (ShutdownException | InvalidStateException ex) {
+        throw new RuntimeException(ex);
+      }
+    });
     return processorRecordMap;
   }
 
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
index fe7fa96..235c829 100644
--- a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -103,9 +103,9 @@
     List<SystemStreamPartition> ssps = new LinkedList<>();
     IntStream.range(0, numShards)
         .forEach(p -> {
-            SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
-            ssps.add(ssp);
-          });
+          SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
+          ssps.add(ssp);
+        });
     ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
 
     // Create Kinesis record processor factory
@@ -133,47 +133,47 @@
 
     Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
     ssps.forEach(ssp -> {
-        try {
-          KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
+      try {
+        KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
 
-          // Verify that the read messages are received in order and are the same as input records
-          Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
-          List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
-          List<Record> inputRecords = inputRecordMap.get(processor);
-          verifyRecords(envelopes, inputRecords, processor.getShardId());
+        // Verify that the read messages are received in order and are the same as input records
+        Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+        List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+        List<Record> inputRecords = inputRecordMap.get(processor);
+        verifyRecords(envelopes, inputRecords, processor.getShardId());
 
-          // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
-          IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
-          consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
-          ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
-          verify(getCheckpointer(processor)).checkpoint(argument.capture());
-          Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
+        // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+        IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+        consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+        ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+        verify(getCheckpointer(processor)).checkpoint(argument.capture());
+        Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
 
-          // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
-          shutDownProcessor(processor, ShutdownReason.ZOMBIE);
-          Assert.assertFalse(sspToProcessorMap.containsValue(processor));
-          Assert.assertTrue(isSspAvailable(consumer, ssp));
-        } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
-          throw new RuntimeException(ex);
-        }
-      });
+        // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
+        shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+        Assert.assertFalse(sspToProcessorMap.containsValue(processor));
+        Assert.assertTrue(isSspAvailable(consumer, ssp));
+      } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
+        throw new RuntimeException(ex);
+      }
+    });
   }
 
   private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
     Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
     IntStream.range(0, numShards)
         .forEach(p -> {
-            String shardId = String.format("shard-%05d", p);
-            // Create Kinesis processor
-            KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
+          String shardId = String.format("shard-%05d", p);
+          // Create Kinesis processor
+          KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
 
-            // Initialize the shard
-            ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
-            InitializationInput initializationInput =
-                new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
-            processor.initialize(initializationInput);
-            processorMap.put(shardId, processor);
-          });
+          // Initialize the shard
+          ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+          InitializationInput initializationInput =
+              new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
+          processor.initialize(initializationInput);
+          processorMap.put(shardId, processor);
+        });
     return processorMap;
   }
 
@@ -186,12 +186,12 @@
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
           consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
       receivedMessages.forEach((key, value) -> {
-          if (messages.containsKey(key)) {
-            messages.get(key).addAll(value);
-          } else {
-            messages.put(key, new ArrayList<>(value));
-          }
-        });
+        if (messages.containsKey(key)) {
+          messages.get(key).addAll(value);
+        } else {
+          messages.put(key, new ArrayList<>(value));
+        }
+      });
       totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
     }
 
@@ -205,19 +205,19 @@
   private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
     Iterator outputRecordsIter = outputRecords.iterator();
     inputRecords.forEach(record -> {
-        IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
-        String outputKey = (String) envelope.getKey();
-        KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
-        Assert.assertEquals(outputKey, record.getPartitionKey());
-        Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
-        Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
-            record.getApproximateArrivalTimestamp());
-        Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
-        ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
-        record.getData().rewind();
-        Assert.assertEquals(outputData, record.getData());
-        verifyOffset(envelope.getOffset(), record, shardId);
-      });
+      IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
+      String outputKey = (String) envelope.getKey();
+      KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+      Assert.assertEquals(outputKey, record.getPartitionKey());
+      Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
+      Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
+          record.getApproximateArrivalTimestamp());
+      Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
+      ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
+      record.getData().rewind();
+      Assert.assertEquals(outputData, record.getData());
+      verifyOffset(envelope.getOffset(), record, shardId);
+    });
   }
 
   private void verifyOffset(String offset, Record inputRecord, String shardId) {
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 7fc1423..08e8124 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -485,12 +485,12 @@
       // Schedule a task to renew the lease after a fixed time interval
       LOG.info("Starting scheduler to keep renewing lease held by the leader.");
       renewLease = new RenewLeaseScheduler((errorMsg) -> {
-          LOG.error(errorMsg);
-          table.updateIsLeader(currentJMVersion.get(), processorId, false);
-          azureLeaderElector.resignLeadership();
-          renewLease.shutdown();
-          liveness.shutdown();
-        }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
+        LOG.error(errorMsg);
+        table.updateIsLeader(currentJMVersion.get(), processorId, false);
+        azureLeaderElector.resignLeadership();
+        renewLease.shutdown();
+        liveness.shutdown();
+      }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
       renewLease.scheduleTask();
 
       doOnProcessorChange(new ArrayList<>());
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
index 2abb380..68d272a 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
@@ -60,14 +60,14 @@
   @Override
   public ScheduledFuture scheduleTask() {
     return scheduler.scheduleWithFixedDelay(() -> {
-        try {
-          String currJVM = currentJMVersion.get();
-          LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
-          table.updateHeartbeat(currJVM, processorId);
-        } catch (Exception e) {
-          errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
-        }
-      }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
+      try {
+        String currJVM = currentJMVersion.get();
+        LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
+        table.updateHeartbeat(currJVM, processorId);
+      } catch (Exception e) {
+        errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
+      }
+    }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
index 235b1f8..03260a3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -67,24 +67,24 @@
   @Override
   public ScheduledFuture scheduleTask() {
     return scheduler.scheduleWithFixedDelay(() -> {
-        try {
-          LOG.info("Checking for job model version upgrade");
-          // Read job model version from the blob.
-          String blobJMV = blob.getJobModelVersion();
-          LOG.info("Job Model Version seen on the blob: {}", blobJMV);
-          String blobBarrierState = blob.getBarrierState();
-          String currentJMV = currentJMVersion.get();
-          LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
-          String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
-          List<String> processorList = blob.getLiveProcessorList();
-          // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
-          if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
-            listener.onStateChange();
-          }
-        } catch (Exception e) {
-          errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+      try {
+        LOG.info("Checking for job model version upgrade");
+        // Read job model version from the blob.
+        String blobJMV = blob.getJobModelVersion();
+        LOG.info("Job Model Version seen on the blob: {}", blobJMV);
+        String blobBarrierState = blob.getBarrierState();
+        String currentJMV = currentJMVersion.get();
+        LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
+        String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
+        List<String> processorList = blob.getLiveProcessorList();
+        // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
+        if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
+          listener.onStateChange();
         }
-      }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+      }
+    }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
index 7386fa9..f7d4d93 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
@@ -77,32 +77,32 @@
   @Override
   public ScheduledFuture scheduleTask() {
     return scheduler.scheduleWithFixedDelay(() -> {
-        try {
-          if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
-            LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+      try {
+        if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+          LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+          barrierTimeout.getAndSet(true);
+          listener.onStateChange();
+        } else {
+          LOG.info("Leader checking for barrier state");
+          // Get processor IDs listed in the table that have the new job model verion.
+          Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
+          Set<String> tableProcessors = new HashSet<>();
+          for (ProcessorEntity entity : tableList) {
+            tableProcessors.add(entity.getRowKey());
+          }
+          LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
+          LOG.info("List of live processors as seen in the table = {}", tableProcessors);
+          if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
             barrierTimeout.getAndSet(true);
             listener.onStateChange();
-          } else {
-            LOG.info("Leader checking for barrier state");
-            // Get processor IDs listed in the table that have the new job model verion.
-            Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
-            Set<String> tableProcessors = new HashSet<>();
-            for (ProcessorEntity entity : tableList) {
-              tableProcessors.add(entity.getRowKey());
-            }
-            LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
-            LOG.info("List of live processors as seen in the table = {}", tableProcessors);
-            if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
-              barrierTimeout.getAndSet(true);
-              listener.onStateChange();
-            } else if (blobProcessorSet.equals(tableProcessors)) {
-              listener.onStateChange();
-            }
+          } else if (blobProcessorSet.equals(tableProcessors)) {
+            listener.onStateChange();
           }
-        } catch (Exception e) {
-          errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
         }
-      }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
+      }
+    }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
index 59a8123..c3502a3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
@@ -64,15 +64,15 @@
   @Override
   public ScheduledFuture scheduleTask() {
     return scheduler.scheduleWithFixedDelay(() -> {
-        try {
-          LOG.info("Checking for leader liveness");
-          if (!checkIfLeaderAlive()) {
-            listener.onStateChange();
-          }
-        } catch (Exception e) {
-          errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+      try {
+        LOG.info("Checking for leader liveness");
+        if (!checkIfLeaderAlive()) {
+          listener.onStateChange();
         }
-      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+      }
+    }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
index d4715f3..2ac52f3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
@@ -69,26 +69,26 @@
   @Override
   public ScheduledFuture scheduleTask() {
     return scheduler.scheduleWithFixedDelay(() -> {
-        try {
-          if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
-            LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
-            scheduler.shutdownNow();
-            return;
-          }
-          LOG.info("Checking for list of live processors");
-          //Get the list of live processors published on the blob.
-          Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
-          //Get the list of live processors from the table. This is the current system state.
-          Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
-          //Invoke listener if the table list is not consistent with the blob list.
-          if (!liveProcessors.equals(currProcessors)) {
-            liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
-            listener.onStateChange();
-          }
-        } catch (Exception e) {
-          errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+      try {
+        if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+          LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
+          scheduler.shutdownNow();
+          return;
         }
-      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+        LOG.info("Checking for list of live processors");
+        //Get the list of live processors published on the blob.
+        Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
+        //Get the list of live processors from the table. This is the current system state.
+        Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
+        //Invoke listener if the table list is not consistent with the blob list.
+        if (!liveProcessors.equals(currProcessors)) {
+          liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
+          listener.onStateChange();
+        }
+      } catch (Exception e) {
+        errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+      }
+    }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
index f158122..865e6e7 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
@@ -56,16 +56,16 @@
   @Override
   public ScheduledFuture scheduleTask() {
     return scheduler.scheduleWithFixedDelay(() -> {
-        try {
-          LOG.info("Renewing lease");
-          boolean status = leaseBlobManager.renewLease(leaseId.get());
-          if (!status) {
-            errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
-          }
-        } catch (Exception e) {
-          errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+      try {
+        LOG.info("Renewing lease");
+        boolean status = leaseBlobManager.renewLease(leaseId.get());
+        if (!status) {
+          errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
         }
-      }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+      }
+    }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 32a8532..85c2b33 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -213,13 +213,13 @@
         throw new IllegalStateException("Attempting to close an already closed AzureBlobAvroWriter");
       }
       allBlobWriterComponents.forEach(blobWriterComponents -> {
-          try {
-            closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream,
-                blobWriterComponents.blockBlobAsyncClient);
-          } catch (IOException e) {
-            throw new SamzaException(e);
-          }
-        });
+        try {
+          closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream,
+              blobWriterComponents.blockBlobAsyncClient);
+        } catch (IOException e) {
+          throw new SamzaException(e);
+        }
+      });
       isClosed = true;
     }
   }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index d787351..e615808 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -351,15 +351,15 @@
     pendingUpload.add(future);
 
     future.handle((aVoid, throwable) -> {
-        if (throwable == null) {
-          LOG.info("Upload block for blob: {} with blockid: {} finished.", blobAsyncClient.getBlobUrl().toString(), blockId);
-          pendingUpload.remove(future);
-          return aVoid;
-        } else {
-          throw new AzureException("Blob upload failed for blob " + blobAsyncClient.getBlobUrl().toString()
-              + " and block with id: " + blockId, throwable);
-        }
-      });
+      if (throwable == null) {
+        LOG.info("Upload block for blob: {} with blockid: {} finished.", blobAsyncClient.getBlobUrl().toString(), blockId);
+        pendingUpload.remove(future);
+        return aVoid;
+      } else {
+        throw new AzureException("Blob upload failed for blob " + blobAsyncClient.getBlobUrl().toString()
+            + " and block with id: " + blockId, throwable);
+      }
+    });
 
     blockNum += 1;
     if (blockNum >= MAX_BLOCKS_IN_AZURE_BLOB) {
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index d8205a5..5ecd528 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -471,41 +471,41 @@
 
   private void flushWriters(Map<String, AzureBlobWriter> sourceWriterMap) {
     sourceWriterMap.forEach((stream, writer) -> {
-        try {
-          LOG.info("Flushing topic:{}", stream);
-          writer.flush();
-        } catch (IOException e) {
-          throw new SystemProducerException("Close failed for topic " + stream, e);
-        }
-      });
+      try {
+        LOG.info("Flushing topic:{}", stream);
+        writer.flush();
+      } catch (IOException e) {
+        throw new SystemProducerException("Close failed for topic " + stream, e);
+      }
+    });
   }
 
   private void closeWriters(String source, Map<String, AzureBlobWriter> sourceWriterMap) throws Exception {
     Set<CompletableFuture<Void>> pendingClose = ConcurrentHashMap.newKeySet();
     try {
       sourceWriterMap.forEach((stream, writer) -> {
-          LOG.info("Closing topic:{}", stream);
-          CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
-            @Override
-            public void run() {
-              try {
-                writer.close();
-              } catch (IOException e) {
-                throw new SystemProducerException("Close failed for topic " + stream, e);
-              }
+        LOG.info("Closing topic:{}", stream);
+        CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              writer.close();
+            } catch (IOException e) {
+              throw new SystemProducerException("Close failed for topic " + stream, e);
             }
-          }, asyncBlobThreadPool);
-          pendingClose.add(future);
-          future.handle((aVoid, throwable) -> {
-              sourceWriterMap.remove(writer);
-              if (throwable != null) {
-                throw new SystemProducerException("Close failed for topic " + stream, throwable);
-              } else {
-                LOG.info("Blob close finished for stream " + stream);
-                return aVoid;
-              }
-            });
+          }
+        }, asyncBlobThreadPool);
+        pendingClose.add(future);
+        future.handle((aVoid, throwable) -> {
+          sourceWriterMap.remove(writer);
+          if (throwable != null) {
+            throw new SystemProducerException("Close failed for topic " + stream, throwable);
+          } else {
+            LOG.info("Blob close finished for stream " + stream);
+            return aVoid;
+          }
         });
+      });
       CompletableFuture<Void> future = CompletableFuture.allOf(pendingClose.toArray(new CompletableFuture[0]));
       LOG.info("Flush source: {} has pending closes: {} ", source, pendingClose.size());
       future.get((long) closeTimeout, TimeUnit.MILLISECONDS);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 69a921e..e7e82ae 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -110,10 +110,10 @@
     LOG.info("Building mappings from physicalName to streamId");
     streamConfig.getStreamIds()
         .forEach((streamId) -> {
-            String physicalName = streamConfig.getPhysicalName(streamId);
-            LOG.info("Obtained physicalName: {} for streamId: {} ", physicalName, streamId);
-            physcialToId.put(physicalName, streamId);
-          });
+          String physicalName = streamConfig.getPhysicalName(streamId);
+          LOG.info("Obtained physicalName: {} for streamId: {} ", physicalName, streamId);
+          physcialToId.put(physicalName, streamId);
+        });
   }
 
   private String getFromStreamIdOrName(String configName, String streamName, String defaultString) {
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 2d22929..2103b1c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -187,15 +187,15 @@
               .getPartitionRuntimeInformation(partition);
       futureList.add(partitionRuntimeInfo);
       partitionRuntimeInfo.thenAccept(ehPartitionInfo -> {
-          LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
-          // Set offsets
-          String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
-          String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
-          String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
-          SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
-            upcomingOffset);
-          sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
-        });
+        LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
+        // Set offsets
+        String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
+        String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
+        String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
+        SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
+          upcomingOffset);
+        sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
+      });
     }
 
     CompletableFuture<Void> futureGroup =
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 2f2873e..a6d975f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -467,29 +467,29 @@
     public void onReceive(Iterable<EventData> events) {
       if (events != null) {
         events.forEach(event -> {
-            byte[] eventDataBody = event.getBytes();
-            if (interceptor != null) {
-              eventDataBody = interceptor.intercept(eventDataBody);
-            }
-            String offset = event.getSystemProperties().getOffset();
-            Object partitionKey = event.getSystemProperties().getPartitionKey();
-            if (partitionKey == null) {
-              partitionKey = event.getProperties().get(EventHubSystemProducer.KEY);
-            }
-            try {
-              updateMetrics(event);
+          byte[] eventDataBody = event.getBytes();
+          if (interceptor != null) {
+            eventDataBody = interceptor.intercept(eventDataBody);
+          }
+          String offset = event.getSystemProperties().getOffset();
+          Object partitionKey = event.getSystemProperties().getPartitionKey();
+          if (partitionKey == null) {
+            partitionKey = event.getProperties().get(EventHubSystemProducer.KEY);
+          }
+          try {
+            updateMetrics(event);
 
-              // note that the partition key can be null
-              put(ssp, new EventHubIncomingMessageEnvelope(ssp, offset, partitionKey, eventDataBody, event));
-            } catch (InterruptedException e) {
-              String msg = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", ssp);
-              LOG.error(msg, e);
-              throw new SamzaException(msg, e);
-            }
+            // note that the partition key can be null
+            put(ssp, new EventHubIncomingMessageEnvelope(ssp, offset, partitionKey, eventDataBody, event));
+          } catch (InterruptedException e) {
+            String msg = String.format("Interrupted while adding the event from ssp %s to dispatch queue.", ssp);
+            LOG.error(msg, e);
+            throw new SamzaException(msg, e);
+          }
 
-            // Cache latest checkpoint
-            streamPartitionOffsets.put(ssp, offset);
-          });
+          // Cache latest checkpoint
+          streamPartitionOffsets.put(ssp, offset);
+        });
       }
     }
 
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
index 83d51ed..7b6a82e 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
@@ -140,25 +140,25 @@
 
     // Auto update the metrics and possible throwable when futures are complete.
     sendResult.handle((aVoid, throwable) -> {
-        long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
-        sendCallbackLatency.get(streamId).update(callbackLatencyMs);
-        aggSendCallbackLatency.update(callbackLatencyMs);
-        if (throwable != null) {
-          sendErrors.get(streamId).inc();
-          aggSendErrors.inc();
-          LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
-          sendExceptionOnCallback.compareAndSet(null, throwable);
-        }
-        return aVoid;
-      });
+      long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
+      sendCallbackLatency.get(streamId).update(callbackLatencyMs);
+      aggSendCallbackLatency.update(callbackLatencyMs);
+      if (throwable != null) {
+        sendErrors.get(streamId).inc();
+        aggSendErrors.inc();
+        LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
+        sendExceptionOnCallback.compareAndSet(null, throwable);
+      }
+      return aVoid;
+    });
   }
 
   public void start() {
     streamIds.forEach(streamId -> {
-        sendCallbackLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_CALLBACK_LATENCY));
-        sendLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_LATENCY));
-        sendErrors.put(streamId, metricsRegistry.newCounter(streamId, SEND_ERRORS));
-      });
+      sendCallbackLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_CALLBACK_LATENCY));
+      sendLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_LATENCY));
+      sendErrors.put(streamId, metricsRegistry.newCounter(streamId, SEND_ERRORS));
+    });
 
     if (aggSendLatency == null) {
       aggSendLatency = new SamzaHistogram(metricsRegistry, AGGREGATE, SEND_LATENCY);
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index f021f36..3912bcf 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -152,32 +152,32 @@
     if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
       // Create all partition senders
       perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
-          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+        EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
 
-          try {
-            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
-            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
-            Integer numPartitions =
-                ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
+        try {
+          Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+          long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+          Integer numPartitions =
+              ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
 
-            for (int i = 0; i < numPartitions; i++) {
-              String partitionId = String.valueOf(i);
-              EventHubClientManager perPartitionClientManager =
-                  createOrGetEventHubClientManagerForPartition(streamId, i);
-              PartitionSender partitionSender =
-                  perPartitionClientManager.getEventHubClient().createPartitionSender(partitionId).get(DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-              partitionSenders.put(i, partitionSender);
-            }
-
-            streamPartitionSenders.put(streamId, partitionSenders);
-          } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
-            throw new SamzaException(msg, e);
-          } catch (EventHubException | IllegalArgumentException e) {
-            String msg = "Creation of partition sender failed with exception";
-            throw new SamzaException(msg, e);
+          for (int i = 0; i < numPartitions; i++) {
+            String partitionId = String.valueOf(i);
+            EventHubClientManager perPartitionClientManager =
+                createOrGetEventHubClientManagerForPartition(streamId, i);
+            PartitionSender partitionSender =
+                perPartitionClientManager.getEventHubClient().createPartitionSender(partitionId).get(DEFAULT_CREATE_PARTITION_SENDER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            partitionSenders.put(i, partitionSender);
           }
-        });
+
+          streamPartitionSenders.put(streamId, partitionSenders);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+          String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
+          throw new SamzaException(msg, e);
+        } catch (EventHubException | IllegalArgumentException e) {
+          String msg = "Creation of partition sender failed with exception";
+          throw new SamzaException(msg, e);
+        }
+      });
     }
     isInitialized = true;
     LOG.info("EventHubSystemProducer initialized.");
@@ -227,10 +227,10 @@
 
     // Initiate metrics
     streamIds.forEach((streamId) -> {
-        eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
-        eventWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_WRITE_RATE));
-        eventByteWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_BYTE_WRITE_RATE));
-      });
+      eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
+      eventWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_WRITE_RATE));
+      eventByteWriteRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_BYTE_WRITE_RATE));
+    });
 
     // Locking to ensure that these aggregated metrics will be created only once across multiple system producers.
     synchronized (AGGREGATE_METRICS_LOCK) {
@@ -365,15 +365,15 @@
   public synchronized void stop() {
     LOG.info("Stopping producer.");
     streamPartitionSenders.values().forEach((streamPartitionSender) -> {
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
-        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
-        try {
-          future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException | InterruptedException | TimeoutException e) {
-          LOG.error("Closing the partition sender failed ", e);
-        }
-      });
+      List<CompletableFuture<Void>> futures = new ArrayList<>();
+      streamPartitionSender.forEach((key, value) -> futures.add(value.close()));
+      CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
+      try {
+        future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+      } catch (ExecutionException | InterruptedException | TimeoutException e) {
+        LOG.error("Closing the partition sender failed ", e);
+      }
+    });
     perStreamEventHubClientManagers.values()
         .parallelStream()
         .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 68544db..51d2cdc 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -294,8 +294,8 @@
     ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
     verify(mockContainerClient, times(2)).getBlobAsyncClient(argument.capture());
     argument.getAllValues().forEach(blobName -> {
-        Assert.assertTrue(blobName.contains(blobUrlPrefix));
-      });
+      Assert.assertTrue(blobName.contains(blobUrlPrefix));
+    });
     List<String> allBlobNames = argument.getAllValues();
     Assert.assertNotEquals(allBlobNames.get(0), allBlobNames.get(1));
 
@@ -359,8 +359,8 @@
     ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
     verify(mockContainerClient, times(2)).getBlobAsyncClient(argument.capture());
     argument.getAllValues().forEach(blobName -> {
-        Assert.assertTrue(blobName.contains(blobUrlPrefix));
-      });
+      Assert.assertTrue(blobName.contains(blobUrlPrefix));
+    });
     List<String> allBlobNames = argument.getAllValues();
     Assert.assertNotEquals(allBlobNames.get(0), allBlobNames.get(1));
 
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index 1614384..b713ec7 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -105,16 +105,16 @@
 
     BlobMetadataGenerator mockBlobMetadataGenerator = mock(BlobMetadataGenerator.class);
     doAnswer(invocation -> {
-        BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
-        String streamName = blobMetadataContext.getStreamName();
-        Long blobSize = blobMetadataContext.getBlobSize();
-        Long numberOfRecords = blobMetadataContext.getNumberOfMessagesInBlob();
-        Map<String, String> metadataProperties = new HashMap<>();
-        metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
-        metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
-        metadataProperties.put(BLOB_RECORD_NUMBER_METADATA, Long.toString(numberOfRecords));
-        return metadataProperties;
-      }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
+      BlobMetadataContext blobMetadataContext = invocation.getArgumentAt(0, BlobMetadataContext.class);
+      String streamName = blobMetadataContext.getStreamName();
+      Long blobSize = blobMetadataContext.getBlobSize();
+      Long numberOfRecords = blobMetadataContext.getNumberOfMessagesInBlob();
+      Map<String, String> metadataProperties = new HashMap<>();
+      metadataProperties.put(BLOB_STREAM_NAME_METADATA, streamName);
+      metadataProperties.put(BLOB_RAW_SIZE_BYTES_METADATA, Long.toString(blobSize));
+      metadataProperties.put(BLOB_RECORD_NUMBER_METADATA, Long.toString(numberOfRecords));
+      return metadataProperties;
+    }).when(mockBlobMetadataGenerator).getBlobMetadata(anyObject());
 
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
@@ -196,8 +196,8 @@
     verify(azureBlobOutputStream).stageBlock(eq(blockIdEncoded(1)), argument.capture(), eq((int) fullBlockCompressedByte.length));
     verify(azureBlobOutputStream).stageBlock(eq(blockIdEncoded(2)), argument2.capture(), eq((int) halfBlockCompressedByte.length));
     argument.getAllValues().forEach(byteBuffer -> {
-        Assert.assertEquals(ByteBuffer.wrap(fullBlockCompressedByte), byteBuffer);
-      });
+      Assert.assertEquals(ByteBuffer.wrap(fullBlockCompressedByte), byteBuffer);
+    });
     Assert.assertEquals(ByteBuffer.wrap(halfBlockCompressedByte), argument2.getAllValues().get(0));
     verify(mockMetrics, times(3)).updateAzureUploadMetrics();
   }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
index 4ad22ae..acf4cfb 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
@@ -596,11 +596,11 @@
   private void setupWriterForProducer(AzureBlobSystemProducer azureBlobSystemProducer,
       AzureBlobWriter mockAzureBlobWriter, String stream) {
     doAnswer(invocation -> {
-        String blobUrl = invocation.getArgumentAt(0, String.class);
-        String streamName = invocation.getArgumentAt(2, String.class);
-        Assert.assertEquals(stream, streamName);
-        Assert.assertEquals(stream, blobUrl);
-        return mockAzureBlobWriter;
-      }).when(azureBlobSystemProducer).createNewWriter(anyString(), any(), anyString());
+      String blobUrl = invocation.getArgumentAt(0, String.class);
+      String streamName = invocation.getArgumentAt(2, String.class);
+      Assert.assertEquals(stream, streamName);
+      Assert.assertEquals(stream, blobUrl);
+      return mockAzureBlobWriter;
+    }).when(azureBlobSystemProducer).createNewWriter(anyString(), any(), anyString());
   }
 }
\ No newline at end of file
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 00bffc3..2d07151 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -98,12 +98,12 @@
       // Consumer mocks
       PartitionReceiver mockPartitionReceiver = PowerMockito.mock(PartitionReceiver.class);
       PowerMockito.when(mockPartitionReceiver.setReceiveHandler(any())).then((Answer<Void>) invocationOnMock -> {
-          PartitionReceiveHandler handler = invocationOnMock.getArgumentAt(0, PartitionReceiveHandler.class);
-          if (handler == null) {
-            Assert.fail("Handler for setReceiverHandler was null");
-          }
-          return null;
-        });
+        PartitionReceiveHandler handler = invocationOnMock.getArgumentAt(0, PartitionReceiveHandler.class);
+        if (handler == null) {
+          Assert.fail("Handler for setReceiverHandler was null");
+        }
+        return null;
+      });
       PartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(PartitionRuntimeInformation.class);
       PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset())
               .thenReturn(EventHubSystemConsumer.START_OF_STREAM);
@@ -114,16 +114,16 @@
       PartitionSender mockPartitionSender1 = PowerMockito.mock(PartitionSender.class);
       PowerMockito.when(mockPartitionSender0.send(any(EventData.class)))
               .then((Answer<CompletableFuture<Void>>) invocationOnMock -> {
-                  EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
-                  receivedData.get(systemName).get(streamName).get(0).add(data);
-                  return new CompletableFuture<>();
-                });
+                EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
+                receivedData.get(systemName).get(streamName).get(0).add(data);
+                return new CompletableFuture<>();
+              });
       PowerMockito.when(mockPartitionSender1.send(any(EventData.class)))
               .then((Answer<CompletableFuture<Void>>) invocationOnMock -> {
-                  EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
-                  receivedData.get(systemName).get(streamName).get(1).add(data);
-                  return new CompletableFuture<>();
-                });
+                EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
+                receivedData.get(systemName).get(streamName).get(1).add(data);
+                return new CompletableFuture<>();
+              });
 
       EventHubRuntimeInformation mockRuntimeInfo = PowerMockito.mock(EventHubRuntimeInformation.class);
       CompletableFuture<EventHubRuntimeInformation> future =  new MockFuture(mockRuntimeInfo);
@@ -133,18 +133,18 @@
         // Consumer calls
         PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
               .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
-                  String partitionId = invocationOnMock.getArgumentAt(1, String.class);
-                  startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
-                  return CompletableFuture.completedFuture(mockPartitionReceiver);
-                });
+                String partitionId = invocationOnMock.getArgumentAt(1, String.class);
+                startingOffsets.put(partitionId, EventPosition.fromEndOfStream());
+                return CompletableFuture.completedFuture(mockPartitionReceiver);
+              });
 
         PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject()))
               .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> {
-                  String partitionId = invocationOnMock.getArgumentAt(1, String.class);
-                  EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
-                  startingOffsets.put(partitionId, offset);
-                  return CompletableFuture.completedFuture(mockPartitionReceiver);
-                });
+                String partitionId = invocationOnMock.getArgumentAt(1, String.class);
+                EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class);
+                startingOffsets.put(partitionId, offset);
+                return CompletableFuture.completedFuture(mockPartitionReceiver);
+              });
 
         PowerMockito.when(mockEventHubClient.getPartitionRuntimeInformation(anyString())).thenReturn(partitionFuture);
 
@@ -156,12 +156,12 @@
 
         PowerMockito.when(mockEventHubClient.send(any(EventData.class), anyString()))
                 .then((Answer<CompletableFuture<Void>>) invocationOnMock -> {
-                    EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
-                    String key = invocationOnMock.getArgumentAt(1, String.class);
-                    Integer intKey = Integer.valueOf(key);
-                    receivedData.get(systemName).get(streamName).get(intKey % 2).add(data);
-                    return new CompletableFuture<>();
-                  });
+                  EventData data = invocationOnMock.getArgumentAt(0, EventData.class);
+                  String key = invocationOnMock.getArgumentAt(1, String.class);
+                  Integer intKey = Integer.valueOf(key);
+                  receivedData.get(systemName).get(streamName).get(intKey % 2).add(data);
+                  return new CompletableFuture<>();
+                });
       } catch (Exception e) {
         Assert.fail("Failed to create create mock methods for EventHubClient");
       }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
index befbf3a..f0b3a9c 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java
@@ -86,11 +86,11 @@
       Assert.assertTrue(partitionMetadataMap.size() >= MIN_EVENTHUB_ENTITY_PARTITION);
       Assert.assertTrue(partitionMetadataMap.size() <= MAX_EVENTHUB_ENTITY_PARTITION);
       partitionMetadataMap.forEach((partition, metadata) -> {
-          Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, metadata.getOldestOffset());
-          Assert.assertNotSame(EventHubSystemConsumer.END_OF_STREAM, metadata.getNewestOffset());
-          String expectedUpcomingOffset = String.valueOf(Long.parseLong(metadata.getNewestOffset()) + 1);
-          Assert.assertEquals(expectedUpcomingOffset, metadata.getUpcomingOffset());
-        });
+        Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, metadata.getOldestOffset());
+        Assert.assertNotSame(EventHubSystemConsumer.END_OF_STREAM, metadata.getNewestOffset());
+        String expectedUpcomingOffset = String.valueOf(Long.parseLong(metadata.getNewestOffset()) + 1);
+        Assert.assertEquals(expectedUpcomingOffset, metadata.getUpcomingOffset());
+      });
     }
   }
 
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index 683cb52..383ae42 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -86,8 +86,8 @@
   @Override
   public void putAll(List<Entry<byte[], byte[]>> entries) {
     entries.forEach(entry -> {
-        validateMessageSize(entry.getValue());
-      });
+      validateMessageSize(entry.getValue());
+    });
     List<Entry<byte[], byte[]>> largeMessageSafeEntries = removeLargeMessages(entries);
     store.putAll(largeMessageSafeEntries);
   }
@@ -146,14 +146,14 @@
   private List<Entry<byte[], byte[]>> removeLargeMessages(List<Entry<byte[], byte[]>> entries) {
     List<Entry<byte[], byte[]>> largeMessageSafeEntries = new ArrayList<>();
     entries.forEach(entry -> {
-        if (!isLargeMessage(entry.getValue())) {
-          largeMessageSafeEntries.add(entry);
-        } else {
-          LOG.info("Ignoring a large message with size " + entry.getValue().length + " since it is greater than "
-              + "the maximum allowed value of " + maxMessageSize);
-          largeMessageSafeStoreMetrics.ignoredLargeMessages().inc();
-        }
-      });
+      if (!isLargeMessage(entry.getValue())) {
+        largeMessageSafeEntries.add(entry);
+      } else {
+        LOG.info("Ignoring a large message with size " + entry.getValue().length + " since it is greater than "
+            + "the maximum allowed value of " + maxMessageSize);
+        largeMessageSafeStoreMetrics.ignoredLargeMessages().inc();
+      }
+    });
     return largeMessageSafeEntries;
   }
 
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
index 89c2794..c94b3f1 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
@@ -118,12 +118,12 @@
     List<Entry<K, V>> toPut = new LinkedList<>();
     List<K> toDelete = new LinkedList<>();
     entries.forEach(e -> {
-        if (e.getValue() != null) {
-          toPut.add(e);
-        } else {
-          toDelete.add(e.getKey());
-        }
-      });
+      if (e.getValue() != null) {
+        toPut.add(e);
+      } else {
+        toDelete.add(e.getKey());
+      }
+    });
 
     if (!toPut.isEmpty()) {
       instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 3d3c39b..e5c1e97 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -190,11 +190,11 @@
     // Set up latch
     final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
     MockSystemProducer.listeners.add((source, envelope) -> {
-        allMessagesSent.countDown();
-        if (allMessagesSent.getCount() == messages.size() - 1) {
-          throw new RuntimeException(); // Throw on the first message
-        }
-      });
+      allMessagesSent.countDown();
+      if (allMessagesSent.getCount() == messages.size() - 1) {
+        throw new RuntimeException(); // Throw on the first message
+      }
+    });
 
     // Log the messages
     messages.forEach((message) -> log.info(message));
@@ -227,13 +227,13 @@
     final CountDownLatch allMessagesSent = new CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra messages
     final CountDownLatch waitForTimeout = new CountDownLatch(1);
     MockSystemProducer.listeners.add((source, envelope) -> {
-        allMessagesSent.countDown();
-        try {
-          waitForTimeout.await();
-        } catch (InterruptedException e) {
-          fail("Test could not run properly because of a thread interrupt.");
-        }
-      });
+      allMessagesSent.countDown();
+      try {
+        waitForTimeout.await();
+      } catch (InterruptedException e) {
+        fail("Test could not run properly because of a thread interrupt.");
+      }
+    });
 
     // Log the messages. This is where the timeout will happen!
     messages.forEach((message) -> log.info(message));
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 280f54e..0248343 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -204,11 +204,11 @@
     // Set up latch
     final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
     MockSystemProducer.listeners.add((source, envelope) -> {
-        allMessagesSent.countDown();
-        if (allMessagesSent.getCount() == messages.size() - 1) {
-          throw new RuntimeException(); // Throw on the first message
-        }
-      });
+      allMessagesSent.countDown();
+      if (allMessagesSent.getCount() == messages.size() - 1) {
+        throw new RuntimeException(); // Throw on the first message
+      }
+    });
 
     // Log the messages
     messages.forEach((message) -> log.info(message));
@@ -241,13 +241,13 @@
     final CountDownLatch allMessagesSent = new CountDownLatch(expectedMessagesSent); // We expect to drop all but the extra messages
     final CountDownLatch waitForTimeout = new CountDownLatch(1);
     MockSystemProducer.listeners.add((source, envelope) -> {
-        allMessagesSent.countDown();
-        try {
-          waitForTimeout.await();
-        } catch (InterruptedException e) {
-          fail("Test could not run properly because of a thread interrupt.");
-        }
-      });
+      allMessagesSent.countDown();
+      try {
+        waitForTimeout.await();
+      } catch (InterruptedException e) {
+        fail("Test could not run properly because of a thread interrupt.");
+      }
+    });
 
     // Log the messages. This is where the timeout will happen!
     messages.forEach((message) -> log.info(message));
diff --git a/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java b/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java
index 9ec1dca..349bb20 100644
--- a/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/AsyncApplicationExample.java
@@ -108,18 +108,18 @@
 
     static CompletionStage<Member> decorateMember(int memberId) {
       return CompletableFuture.supplyAsync(() -> {
-          /*
-           * Introduce some lag to mimic remote call. In real use cases, this typically translates to over the wire
-           * network call to some rest service.
-           */
-          try {
-            Thread.sleep((long) (Math.random() * 10000));
-          } catch (InterruptedException ec) {
-            System.out.println("Interrupted during sleep");
-          }
+        /*
+         * Introduce some lag to mimic remote call. In real use cases, this typically translates to over the wire
+         * network call to some rest service.
+         */
+        try {
+          Thread.sleep((long) (Math.random() * 10000));
+        } catch (InterruptedException ec) {
+          System.out.println("Interrupted during sleep");
+        }
 
-          return new Member(memberId, getRandomGender(), getRandomCountry());
-        });
+        return new Member(memberId, getRandomGender(), getRandomCountry());
+      });
     }
 
     static String getRandomGender() {
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index e2db379..fb65eea 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -320,10 +320,10 @@
     SystemConsumer consumer = factory.getConsumer(systemName, config, null);
     String name = (String) outputDescriptor.getPhysicalName().orElse(streamId);
     metadata.get(name).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
-        SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
-        ssps.add(temp);
-        consumer.register(temp, "0");
-      });
+      SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
+      ssps.add(temp);
+      consumer.register(temp, "0");
+    });
 
     long t = System.currentTimeMillis();
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>();
@@ -361,7 +361,7 @@
     return output.entrySet()
         .stream()
         .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
-            entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList())));
+          entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList())));
   }
 
   /**
@@ -395,18 +395,18 @@
     InMemorySystemProducer producer = (InMemorySystemProducer) factory.getProducer(systemName, config, null);
     SystemStream sysStream = new SystemStream(systemName, streamName);
     partitionData.forEach((partitionId, partition) -> {
-        partition.forEach(e -> {
-            Object key = e instanceof KV ? ((KV) e).getKey() : null;
-            Object value = e instanceof KV ? ((KV) e).getValue() : e;
-            if (value instanceof IncomingMessageEnvelope) {
-              producer.send((IncomingMessageEnvelope) value);
-            } else {
-              producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
-            }
-          });
-        producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
-            new EndOfStreamMessage(null)));
+      partition.forEach(e -> {
+        Object key = e instanceof KV ? ((KV) e).getKey() : null;
+        Object value = e instanceof KV ? ((KV) e).getValue() : e;
+        if (value instanceof IncomingMessageEnvelope) {
+          producer.send((IncomingMessageEnvelope) value);
+        } else {
+          producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+        }
       });
+      producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
+          new EndOfStreamMessage(null)));
+    });
   }
 
   private void deleteStoreDirectories() {
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
index f7545dc..2a579bf 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TestKeyValueSizeHistogramMetric.java
@@ -105,26 +105,26 @@
     }
 
     metricsRegistry.getGroups().forEach(group -> metricsRegistry.getGroup(group.toString()).forEach((name, metric) -> {
-        if (names.contains(name)) {
-          metric.visit(new MetricsVisitor() {
-            @Override
-            public void counter(Counter counter) {
+      if (names.contains(name)) {
+        metric.visit(new MetricsVisitor() {
+          @Override
+          public void counter(Counter counter) {
 
-            }
+          }
 
-            @Override
-            public <T> void gauge(Gauge<T> gauge) {
-              Double num = (Double) gauge.getValue();
-              Assert.assertNotEquals(0D, (Double) gauge.getValue(), 0.0001);
-            }
+          @Override
+          public <T> void gauge(Gauge<T> gauge) {
+            Double num = (Double) gauge.getValue();
+            Assert.assertNotEquals(0D, (Double) gauge.getValue(), 0.0001);
+          }
 
-            @Override
-            public void timer(Timer timer) {
+          @Override
+          public void timer(Timer timer) {
 
-            }
-          });
-        }
-      }));
+          }
+        });
+      }
+    }));
   }
 
   private String getRandomString() {
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 53bc234..b0630b2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -105,8 +105,8 @@
             .map(KV::getValue)
             .partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
             .sink((m, collector, coordinator) -> {
-                received.add(m.getValue());
-              });
+              received.add(m.getValue());
+            });
       }
     }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 4691e87..25d31ea 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -157,8 +157,8 @@
             .map(KV::getValue)
             .partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
             .sink((m, collector, coordinator) -> {
-                received.add(m.getValue());
-              });
+              received.add(m.getValue());
+            });
       }
     }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 5e25817..6afc77c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -167,8 +167,8 @@
           .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
           .sendTo(table)
           .sink((kv, collector, coordinator) -> {
-              LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
-            });
+            LOG.info("Inserted Profile with Key: {} in profile-view-store", kv.getKey());
+          });
 
       OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
       appDescriptor.getInputStream(pageViewISD)
@@ -177,8 +177,8 @@
           .sendTo(outputStream)
           .map(TestTableData.EnrichedPageView::getPageKey)
           .sink((joinPageKey, collector, coordinator) -> {
-              collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
-            });
+            collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
+          });
 
     }
   }
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 24726f8..dbdacf1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -94,11 +94,11 @@
             new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
         .sink((message, messageCollector, taskCoordinator) -> {
-            taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
-            messageCollector.send(
-                new OutgoingMessageEnvelope(
-                    new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
-          });
+          taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
+          messageCollector.send(
+              new OutgoingMessageEnvelope(
+                  new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
+        });
 
 
     intermediateStreamIds.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamId());
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
index 275be34..a60aea7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
@@ -152,14 +152,14 @@
     private static CompletionStage<Collection<PageView>> filterGuestPageViews(PageView pageView,
         Predicate<PageView> shouldFailProcess, Supplier<Long> processJitter) {
       CompletableFuture<Collection<PageView>> filteredPageViews = CompletableFuture.supplyAsync(() -> {
-          try {
-            Thread.sleep(processJitter.get());
-          } catch (InterruptedException ex) {
-            System.out.println("Interrupted during sleep.");
-          }
+        try {
+          Thread.sleep(processJitter.get());
+        } catch (InterruptedException ex) {
+          System.out.println("Interrupted during sleep.");
+        }
 
-          return Long.valueOf(pageView.getUserId()) < 1 ? Collections.emptyList() : Collections.singleton(pageView);
-        });
+        return Long.valueOf(pageView.getUserId()) < 1 ? Collections.emptyList() : Collections.singleton(pageView);
+      });
 
       if (shouldFailProcess.test(pageView)) {
         filteredPageViews.completeExceptionally(new RuntimeException("Remote service threw an exception"));
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 0c8cea4..1e5e24c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -278,10 +278,10 @@
       doNothing().when(listener).afterStart();
       doNothing().when(listener).afterFailure(any());
       doAnswer(invocation -> {
-          // stopped successfully
-          shutdownLatch.countDown();
-          return null;
-        }).when(listener).afterStop();
+        // stopped successfully
+        shutdownLatch.countDown();
+        return null;
+      }).when(listener).afterStop();
     }
 
     private void initProducer(String bootstrapServer) {
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index a2a1e5c..fef4836 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -236,10 +236,10 @@
       config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "false");
     }
     storeName.ifPresent(s -> {
-        config.put(String.format(StorageConfig.FACTORY, s), MockStoreFactory.class.getName());
-        config.put(String.format(StorageConfig.KEY_SERDE, s), "string");
-        config.put(String.format(StorageConfig.MSG_SERDE, s), "string");
-      });
+      config.put(String.format(StorageConfig.FACTORY, s), MockStoreFactory.class.getName());
+      config.put(String.format(StorageConfig.KEY_SERDE, s), "string");
+      config.put(String.format(StorageConfig.MSG_SERDE, s), "string");
+    });
     Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder().putAll(config).build();
     Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
     applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
@@ -274,11 +274,11 @@
     final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
 
     zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
-        // When appRunner2 with id: PROCESSOR_IDS[1] is registered, run processing message in appRunner1.
-        if (currentChilds.contains(PROCESSOR_IDS[1])) {
-          secondProcessorRegistered.countDown();
-        }
-      });
+      // When appRunner2 with id: PROCESSOR_IDS[1] is registered, run processing message in appRunner1.
+      if (currentChilds.contains(PROCESSOR_IDS[1])) {
+        secondProcessorRegistered.countDown();
+      }
+    });
 
     // Set up stream app appRunner2.
     CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
@@ -356,11 +356,11 @@
     final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
 
     zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
-        // When appRunner2 with id: PROCESSOR_IDS[1] is registered, start processing message in appRunner1.
-        if (currentChilds.contains(PROCESSOR_IDS[1])) {
-          secondProcessorRegistered.countDown();
-        }
-      });
+      // When appRunner2 with id: PROCESSOR_IDS[1] is registered, start processing message in appRunner1.
+      if (currentChilds.contains(PROCESSOR_IDS[1])) {
+        secondProcessorRegistered.countDown();
+      }
+    });
 
     // Set up appRunner2.
     CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
@@ -755,10 +755,10 @@
     Map<String, String> configMap = new HashMap<>();
     CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde("set-config");
     metadataStore.all().forEach((key, value) -> {
-        CoordinatorStreamStore.CoordinatorMessageKey coordinatorMessageKey = CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(key);
-        String deserializedValue = jsonSerde.fromBytes(value);
-        configMap.put(coordinatorMessageKey.getKey(), deserializedValue);
-      });
+      CoordinatorStreamStore.CoordinatorMessageKey coordinatorMessageKey = CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(key);
+      String deserializedValue = jsonSerde.fromBytes(value);
+      configMap.put(coordinatorMessageKey.getKey(), deserializedValue);
+    });
     return new MapConfig(configMap);
   }
 
@@ -1279,8 +1279,8 @@
   private static List<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
     List<SystemStreamPartition> ssps = new ArrayList<>();
     jobModel.getContainers().forEach((containerName, containerModel) -> {
-        containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));
-      });
+      containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));
+    });
     return ssps;
   }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index b70eb2c..8b5d3c5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -864,13 +864,13 @@
 
     List<String> outMessages = TestAvroSystemFactory.messages.stream()
         .map(x -> {
-            GenericRecord profileAddr = (GenericRecord) ((GenericRecord) x.getMessage()).get("profileAddress");
-            GenericRecord streetNum = (GenericRecord) (profileAddr.get("streetnum"));
-            return ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
-                + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
-                ((GenericRecord) x.getMessage()).get("profileName").toString()) + ","
-                + profileAddr.get("zip") + "," + streetNum.get("number");
-          })
+          GenericRecord profileAddr = (GenericRecord) ((GenericRecord) x.getMessage()).get("profileAddress");
+          GenericRecord streetNum = (GenericRecord) (profileAddr.get("streetnum"));
+          return ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+              + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+              ((GenericRecord) x.getMessage()).get("profileName").toString()) + ","
+              + profileAddr.get("zip") + "," + streetNum.get("number");
+        })
         .collect(Collectors.toList());
     Assert.assertEquals(numMessages, outMessages.size());
     List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameAddressJoin(numMessages);
@@ -1185,19 +1185,19 @@
     HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>();
     TestAvroSystemFactory.messages.stream()
         .map(x -> {
-            String pageKey = ((GenericRecord) x.getMessage()).get("pageKey").toString();
-            String count = ((GenericRecord) x.getMessage()).get("count").toString();
-            pageKeyCountListMap.computeIfAbsent(pageKey, k -> new ArrayList<>()).add(count);
-            return pageKeyCountListMap;
-          });
+          String pageKey = ((GenericRecord) x.getMessage()).get("pageKey").toString();
+          String count = ((GenericRecord) x.getMessage()).get("count").toString();
+          pageKeyCountListMap.computeIfAbsent(pageKey, k -> new ArrayList<>()).add(count);
+          return pageKeyCountListMap;
+        });
 
     HashMap<String, Integer> pageKeyCountMap = new HashMap<>();
     pageKeyCountListMap.forEach((key, list) -> {
-        // Check that the number of windows per key is non-zero but less than the number of input messages per key.
-        Assert.assertTrue(list.size() > 1 && list.size() < numMessages / TestAvroSystemFactory.pageKeys.length);
-        // Collapse the count of messages per key
-        pageKeyCountMap.put(key, list.stream().mapToInt(Integer::parseInt).sum());
-      });
+      // Check that the number of windows per key is non-zero but less than the number of input messages per key.
+      Assert.assertTrue(list.size() > 1 && list.size() < numMessages / TestAvroSystemFactory.pageKeys.length);
+      // Collapse the count of messages per key
+      pageKeyCountMap.put(key, list.stream().mapToInt(Integer::parseInt).sum());
+    });
 
     Set<String> pageKeys = new HashSet<>(Arrays.asList("job", "inbox"));
     HashMap<String, Integer> expectedPageKeyCountMap =
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index a55b8c1..78fc7b5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -126,9 +126,9 @@
       GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
       appDesc.getInputStream(pageViewISD)
           .map(pv -> {
-              received.add(pv);
-              return pv;
-            })
+            received.add(pv);
+            return pv;
+          })
           .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
           .join(table, new PageViewToProfileJoinFunction())
           .sink((m, collector, coordinator) -> joined.add(m));
@@ -188,15 +188,15 @@
 
       profileStream1
           .map(m -> {
-              sentToProfileTable1.add(m);
-              return new KV(m.getMemberId(), m);
-            })
+            sentToProfileTable1.add(m);
+            return new KV(m.getMemberId(), m);
+          })
           .sendTo(profileTable);
       profileStream2
           .map(m -> {
-              sentToProfileTable2.add(m);
-              return new KV(m.getMemberId(), m);
-            })
+            sentToProfileTable2.add(m);
+            return new KV(m.getMemberId(), m);
+          })
           .sendTo(profileTable);
 
       GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
index eecc6b4..071f65e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -143,10 +143,10 @@
       return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
-              Profile profile = (Profile) msg.getMessage();
-              int key = profile.getMemberId();
-              return ImmutableList.of(new Entry<>(key, profile));
-            });
+            Profile profile = (Profile) msg.getMessage();
+            int key = profile.getMemberId();
+            return ImmutableList.of(new Entry<>(key, profile));
+          });
     }
   }
 
@@ -156,10 +156,10 @@
       return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
-              TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
-              int key = profile.getMemberId();
-              return ImmutableList.of(new Entry<>(key, profile));
-            });
+            TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
+            int key = profile.getMemberId();
+            return ImmutableList.of(new Entry<>(key, profile));
+          });
     }
   }
 }
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
index 6ba28ae..268cd23 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
@@ -61,11 +61,11 @@
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
       final AtomicInteger offset = new AtomicInteger(0);
       set.forEach(ssp -> {
-          List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
-              .map(object -> new IncomingMessageEnvelope(ssp, String.valueOf(offset.incrementAndGet()), null, object)).collect(Collectors.toList());
-          envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
-          envelopeMap.put(ssp, envelopes);
-        });
+        List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
+            .map(object -> new IncomingMessageEnvelope(ssp, String.valueOf(offset.incrementAndGet()), null, object)).collect(Collectors.toList());
+        envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
+        envelopeMap.put(ssp, envelopes);
+      });
       done = true;
       return envelopeMap;
     } else {
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
index c735c74..493bce8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
@@ -53,19 +53,19 @@
   public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
     return streamNames.stream()
         .collect(Collectors.toMap(Function.identity(), streamName -> {
-            int messageCount = isBootstrapStream(streamName) ? getMessageCount(streamName) : -1;
-            String oldestOffset = messageCount < 0 ? null : "0";
-            String newestOffset = messageCount < 0 ? null : String.valueOf(messageCount - 1);
-            String upcomingOffset = messageCount < 0 ? null : String.valueOf(messageCount);
-            Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>();
-            int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1);
-            for (int i = 0; i < partitionCount; i++) {
-              metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(
-                  oldestOffset, newestOffset, upcomingOffset
-              ));
-            }
-            return new SystemStreamMetadata(streamName, metadataMap);
-          }));
+          int messageCount = isBootstrapStream(streamName) ? getMessageCount(streamName) : -1;
+          String oldestOffset = messageCount < 0 ? null : "0";
+          String newestOffset = messageCount < 0 ? null : String.valueOf(messageCount - 1);
+          String upcomingOffset = messageCount < 0 ? null : String.valueOf(messageCount);
+          Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>();
+          int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1);
+          for (int i = 0; i < partitionCount; i++) {
+            metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(
+                oldestOffset, newestOffset, upcomingOffset
+            ));
+          }
+          return new SystemStreamMetadata(streamName, metadataMap);
+        }));
   }
 
   @Override