SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer (#1474)

Issue: StreamAppender sets both partition key and record key = container name for OutgoingMessageEnvelope sent to underlying SystemProducer. This restricts how the classes extending StreamAppender send to SystemProducer.

Change: Introduce method to decorate the serializedLogEvent that can be overridden by extenders of StreamAppender
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index cadf63c..2b03c31 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -439,10 +439,19 @@
    * Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
    * @param serializedLogEvent
    */
-  protected void sendEventToSystemProducer(byte[] serializedLogEvent) {
+  private void sendEventToSystemProducer(byte[] serializedLogEvent) {
     metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
     metrics.logMessagesCountSent.inc();
-    systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent));
+    systemProducer.send(SOURCE, decorateLogEvent(serializedLogEvent));
+  }
+
+  /**
+   * Helper method to create an OutgoingMessageEnvelope from the serialized log event.
+   * @param messageBytes message bytes
+   * @return OutgoingMessageEnvelope that contains the message bytes along with the system stream
+   */
+  protected OutgoingMessageEnvelope decorateLogEvent(byte[] messageBytes) {
+    return new OutgoingMessageEnvelope(systemStream, keyBytes, messageBytes);
   }
 
   protected String getStreamName(String jobName, String jobId) {