SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer (#1474)
Issue: StreamAppender sets both partition key and record key = container name for OutgoingMessageEnvelope sent to underlying SystemProducer. This restricts how the classes extending StreamAppender send to SystemProducer.
Change: Introduce method to decorate the serializedLogEvent that can be overridden by extenders of StreamAppender
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index cadf63c..2b03c31 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -439,10 +439,19 @@
* Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
* @param serializedLogEvent
*/
- protected void sendEventToSystemProducer(byte[] serializedLogEvent) {
+ private void sendEventToSystemProducer(byte[] serializedLogEvent) {
metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
metrics.logMessagesCountSent.inc();
- systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent));
+ systemProducer.send(SOURCE, decorateLogEvent(serializedLogEvent));
+ }
+
+ /**
+ * Helper method to create an OutgoingMessageEnvelope from the serialized log event.
+ * @param messageBytes message bytes
+ * @return OutgoingMessageEnvelope that contains the message bytes along with the system stream
+ */
+ protected OutgoingMessageEnvelope decorateLogEvent(byte[] messageBytes) {
+ return new OutgoingMessageEnvelope(systemStream, keyBytes, messageBytes);
}
protected String getStreamName(String jobName, String jobId) {