METRON-2360 BRO-PLUGIN: does not build with 3.2.1 (ottobackwards) closes apache/metron-bro-plugin-kafka#48
diff --git a/docker/containers/zeek/Dockerfile b/docker/containers/zeek/Dockerfile
index dba31d7..d8eda31 100644
--- a/docker/containers/zeek/Dockerfile
+++ b/docker/containers/zeek/Dockerfile
@@ -57,7 +57,7 @@
 # install pip3 and zkg
 WORKDIR /root
 COPY requirements.txt requirements.txt
-RUN dnf -y install python3-pip && \
+RUN dnf -y install python3-pip diffutils && \
     dnf clean all && \
     python3 -m pip install --upgrade pip && \
     python3 -m pip install -r requirements.txt && \
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 0579887..4f8ba10 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -29,7 +29,7 @@
     build:
       context: containers/zeek
       args:
-        ZEEK_VERSION: "3.1.3"
+        ZEEK_VERSION: "3.2.1"
         LIBRDKAFKA_VERSION: "1.4.2"
     image: metron-bro-plugin-kafka_zeek:latest
     depends_on:
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index deeea95..1e19b3b 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -22,12 +22,8 @@
 using namespace writer;
 
 // The Constructor is called once for each log filter that uses this log writer.
-KafkaWriter::KafkaWriter(WriterFrontend* frontend):
-    WriterBackend(frontend),
-    formatter(NULL),
-    producer(NULL),
-    topic(NULL)
-{
+KafkaWriter::KafkaWriter(WriterFrontend *frontend)
+    : WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL) {
   /**
    * We need thread-local copies of all user-defined settings coming from zeek
    * scripting land.  accessing these is not thread-safe and 'DoInit' is
@@ -41,177 +37,170 @@
   // json_timestamps
   ODesc tsfmt;
   BifConst::Kafka::json_timestamps->Describe(&tsfmt);
-  json_timestamps.assign(
-      (const char*) tsfmt.Bytes(),
-      tsfmt.Len()
-    );
+  json_timestamps.assign((const char *)tsfmt.Bytes(), tsfmt.Len());
 
   // topic name - thread local copy
-  topic_name.assign(
-    (const char*)BifConst::Kafka::topic_name->Bytes(),
-    BifConst::Kafka::topic_name->Len());
+  topic_name.assign((const char *)BifConst::Kafka::topic_name->Bytes(),
+                    BifConst::Kafka::topic_name->Len());
 
   // kafka_conf - thread local copy
-  Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
-  IterCookie* c = val->AsTable()->InitForIteration();
-  HashKey* k;
-  TableEntryVal* v;
+  Val *val = BifConst::Kafka::kafka_conf->AsTableVal();
+  IterCookie *c = val->AsTable()->InitForIteration();
+  HashKey *k;
+  TableEntryVal *v;
   while ((v = val->AsTable()->NextEntry(k, c))) {
-
-      // fetch the key and value
-      ListVal* index = val->AsTableVal()->RecoverIndex(k);
-      string key = index->Index(0)->AsString()->CheckString();
-      string val = v->Value()->AsString()->CheckString();
-      kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val));
-
-      // cleanup
-      Unref(index);
-      delete k;
-  }
-
-  Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal();
-  c = val->AsTable()->InitForIteration();
-  while ((v = mvals->AsTable()->NextEntry(k, c))) {
-
     // fetch the key and value
-    ListVal* index = mvals->AsTableVal()->RecoverIndex(k);
-    string key = index->Index(0)->AsString()->CheckString();
-    string val = v->Value()->AsString()->CheckString();
-    additional_message_values.insert (additional_message_values.begin(), pair<string, string> (key, val));
+    ListVal *index = val->AsTableVal()->RecoverIndex(k);
+    std::string key = index->Index(0)->AsString()->CheckString();
+    std::string val = v->Value()->AsString()->CheckString();
+    kafka_conf.insert(kafka_conf.begin(),
+                      std::pair<std::string, std::string>(key, val));
 
     // cleanup
     Unref(index);
     delete k;
   }
 
+  Val *mvals = BifConst::Kafka::additional_message_values->AsTableVal();
+  c = val->AsTable()->InitForIteration();
+  while ((v = mvals->AsTable()->NextEntry(k, c))) {
+    ListVal *index = mvals->AsTableVal()->RecoverIndex(k);
+    std::string key = index->Index(0)->AsString()->CheckString();
+    std::string val = v->Value()->AsString()->CheckString();
+    additional_message_values.insert(additional_message_values.begin(),
+                                     std::pair<std::string, std::string>(key, val));
+    Unref(index);
+    delete k;
+  }
 }
 
-KafkaWriter::~KafkaWriter()
-{
+KafkaWriter::~KafkaWriter() {
   // Cleanup must happen in DoFinish, not in the destructor
 }
 
-string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name) const
-{
-    map<const char*, const char*>::const_iterator it = info.config.find(name.c_str());
-    if (it == info.config.end())
-        return string();
-    else
-        return it->second;
+std::string KafkaWriter::GetConfigValue(const WriterInfo &info,
+                                   const std::string name) const {
+  std::map<const char *, const char *>::const_iterator it =
+      info.config.find(name.c_str());
+  if (it == info.config.end())
+    return std::string();
+  else
+    return it->second;
 }
 
 /**
  * DoInit is called once for each call to the constructor, but in a separate
  * thread
  */
-bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
-{
+bool KafkaWriter::DoInit(const WriterInfo &info, int num_fields,
+                         const threading::Field *const *fields) {
+  // TimeFormat object, default to TS_EPOCH
+  threading::formatter::JSON::TimeFormat tf =
+      threading::formatter::JSON::TS_EPOCH;
 
-    // Timeformat object, default to TS_EPOCH
-    threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
+  // Allow overriding of the kafka topic via the Zeek script constant
+  // 'topic_name' which can be applied when adding a new Zeek log filter.
+  topic_name_override = GetConfigValue(info, "topic_name");
 
-    // Allow overriding of the kafka topic via the Zeek script constant
-    // 'topic_name' which can be applied when adding a new Zeek log filter.
-    topic_name_override = GetConfigValue(info, "topic_name");
+  if (!topic_name_override.empty()) {
+    // Override the topic name if 'topic_name' is specified in the log
+    // filter's $conf
+    topic_name = topic_name_override;
+  } else if (topic_name.empty()) {
+    // If no global 'topic_name' is defined, use the log stream's 'path'
+    topic_name = info.path;
+  }
 
-    if(!topic_name_override.empty()) {
-      // Override the topic name if 'topic_name' is specified in the log
-      // filter's $conf
-      topic_name = topic_name_override;
-    } else if(topic_name.empty()) {
-      // If no global 'topic_name' is defined, use the log stream's 'path'
-      topic_name = info.path;
-    }
+  if (mocking) {
+    raise_topic_resolved_event(topic_name);
+  }
 
-    if (mocking) {
-       raise_topic_resolved_event(topic_name);
-    }
+  /**
+   * Format the timestamps
+   * NOTE: This string comparision implementation is currently the necessary
+   * way to do it, as there isn't a way to pass the Zeek enum into C++ enum.
+   * This makes the user interface consistent with the existing Zeek Logging
+   * configuration for the ASCII log output.
+   */
+  if (strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0) {
+    tf = threading::formatter::JSON::TS_EPOCH;
+  } else if (strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0) {
+    tf = threading::formatter::JSON::TS_MILLIS;
+  } else if (strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0) {
+    tf = threading::formatter::JSON::TS_ISO8601;
+  } else {
+    Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
+              json_timestamps.c_str()));
+    return false;
+  }
 
-    /**
-     * Format the timestamps
-     * NOTE: This string comparision implementation is currently the necessary
-     * way to do it, as there isn't a way to pass the Zeek enum into C++ enum.
-     * This makes the user interface consistent with the existing Zeek Logging
-     * configuration for the ASCII log output.
-     */
-    if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 ) {
-      tf = threading::formatter::JSON::TS_EPOCH;
+  // initialize the formatter
+  if (BifConst::Kafka::tag_json) {
+    formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
+  } else {
+    formatter = new threading::formatter::JSON(this, tf);
+  }
+
+  // is debug enabled
+  std::string debug;
+  debug.assign((const char *)BifConst::Kafka::debug->Bytes(),
+               BifConst::Kafka::debug->Len());
+  bool is_debug(!debug.empty());
+  if (is_debug) {
+    MsgThread::Info(
+        Fmt("Debug is turned on and set to: %s.  Available debug context: %s.",
+            debug.c_str(), RdKafka::get_debug_contexts().c_str()));
+  }
+
+  // kafka global configuration
+  std::string err;
+  conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+
+  // apply the user-defined settings to kafka
+  std::map<std::string, std::string>::iterator i;
+  for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
+    std::string key = i->first;
+    std::string val = i->second;
+
+    // apply setting to kafka
+    if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+      Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(),
+                err.c_str()));
+      return false;
     }
-    else if ( strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0 ) {
-      tf = threading::formatter::JSON::TS_MILLIS;
+  }
+
+  if (is_debug) {
+    std::string key("debug");
+    std::string val(debug);
+    if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+      Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(),
+                err.c_str()));
+      return false;
     }
-    else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
-      tf = threading::formatter::JSON::TS_ISO8601;
-    }
-    else {
-      Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
-        json_timestamps.c_str()));
+  }
+
+  if (!mocking) {
+    // create kafka producer
+    producer = RdKafka::Producer::create(conf, err);
+    if (!producer) {
+      Error(Fmt("Failed to create producer: %s", err.c_str()));
       return false;
     }
 
-    // initialize the formatter
-    if(BifConst::Kafka::tag_json) {
-      formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
-    }
-    else {
-      formatter = new threading::formatter::JSON(this, tf);
+    // create handle to topic
+    topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+    topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
+    if (!topic) {
+      Error(Fmt("Failed to create topic handle: %s", err.c_str()));
+      return false;
     }
 
-    // is debug enabled
-    string debug;
-    debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len());
-    bool is_debug(!debug.empty());
-    if(is_debug) {
-      MsgThread::Info(Fmt("Debug is turned on and set to: %s.  Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
+    if (is_debug) {
+      MsgThread::Info(Fmt("Successfully created producer."));
     }
-
-    // kafka global configuration
-    string err;
-    conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
-
-    // apply the user-defined settings to kafka
-    map<string,string>::iterator i;
-    for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
-      string key = i->first;
-      string val = i->second;
-
-      // apply setting to kafka
-      if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-          Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()));
-          return false;
-      }
-    }
-
-    if(is_debug) {
-        string key("debug");
-        string val(debug);
-        if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-            Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()));
-            return false;
-        }
-    }
-
-    if (!mocking) {
-        // create kafka producer
-        producer = RdKafka::Producer::create(conf, err);
-        if (!producer) {
-            Error(Fmt("Failed to create producer: %s", err.c_str()));
-            return false;
-        }
-
-        // create handle to topic
-        topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
-        topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
-        if (!topic) {
-            Error(Fmt("Failed to create topic handle: %s", err.c_str()));
-            return false;
-        }
-
-        if (is_debug) {
-            MsgThread::Info(Fmt("Successfully created producer."));
-        }
-    }
-    return true;
+  }
+  return true;
 }
 
 /**
@@ -220,69 +209,68 @@
  * the thread can be safely terminated. As such, all resources created must be
  * removed here.
  */
-bool KafkaWriter::DoFinish(double network_time)
-{
-    bool success = false;
-    int poll_interval = 1000;
-    int waited = 0;
-    int max_wait = BifConst::Kafka::max_wait_on_shutdown;
+bool KafkaWriter::DoFinish(double network_time) {
+  bool success = false;
+  int poll_interval = 1000;
+  int waited = 0;
+  int max_wait = BifConst::Kafka::max_wait_on_shutdown;
 
-    if (!mocking) {
-        // wait a bit for queued messages to be delivered
-        while (producer->outq_len() > 0 && waited <= max_wait) {
-            producer->poll(poll_interval);
-            waited += poll_interval;
-        }
-
-        // successful only if all messages delivered
-        if (producer->outq_len() == 0) {
-            success = true;
-        } else {
-            Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
-        }
-
-        delete topic;
-        delete producer;
-        delete topic_conf;
+  if (!mocking) {
+    // wait a bit for queued messages to be delivered
+    while (producer->outq_len() > 0 && waited <= max_wait) {
+      producer->poll(poll_interval);
+      waited += poll_interval;
     }
-    delete formatter;
-    delete conf;
 
-    return success;
+    // successful only if all messages delivered
+    if (producer->outq_len() == 0) {
+      success = true;
+    } else {
+      Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
+    }
+
+    delete topic;
+    delete producer;
+    delete topic_conf;
+  }
+  delete formatter;
+  delete conf;
+
+  return success;
 }
 
 /**
  * Writer-specific output method implementing recording of one log
  * entry.
  */
-bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals)
-{
-    if (!mocking) {
-        ODesc buff;
-        buff.Clear();
+bool KafkaWriter::DoWrite(int num_fields, const threading::Field *const *fields,
+                          threading::Value **vals) {
+  if (!mocking) {
+    ODesc buff;
+    buff.Clear();
 
-        // format the log entry
-      if(BifConst::Kafka::tag_json) {
-          dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, num_fields, fields, vals,
-                              additional_message_values);
-       } else {
-        formatter->Describe(&buff, num_fields, fields, vals);
-       }
-
-        // send the formatted log entry to kafka
-        const char *raw = (const char *) buff.Bytes();
-        RdKafka::ErrorCode resp = producer->produce(
-                topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
-                const_cast<char *>(raw), strlen(raw), NULL, NULL);
-
-        if (RdKafka::ERR_NO_ERROR == resp) {
-            producer->poll(0);
-        } else {
-            string err = RdKafka::err2str(resp);
-            Error(Fmt("Kafka send failed: %s", err.c_str()));
-        }
+    // format the log entry
+    if (BifConst::Kafka::tag_json) {
+      dynamic_cast<threading::formatter::TaggedJSON *>(formatter)->Describe(
+          &buff, num_fields, fields, vals, additional_message_values);
+    } else {
+      formatter->Describe(&buff, num_fields, fields, vals);
     }
-    return true;
+
+    // send the formatted log entry to kafka
+    const char *raw = (const char *)buff.Bytes();
+    RdKafka::ErrorCode resp = producer->produce(
+        topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
+        const_cast<char *>(raw), strlen(raw), NULL, NULL);
+
+    if (RdKafka::ERR_NO_ERROR == resp) {
+      producer->poll(0);
+    } else {
+      std::string err = RdKafka::err2str(resp);
+      Error(Fmt("Kafka send failed: %s", err.c_str()));
+    }
+  }
+  return true;
 }
 
 /**
@@ -294,10 +282,9 @@
  * optimized for performance. The current buffering state can be
  * queried via IsBuf().
  */
-bool KafkaWriter::DoSetBuf(bool enabled)
-{
-    // no change in behavior
-    return true;
+bool KafkaWriter::DoSetBuf(bool enabled) {
+  // no change in behavior
+  return true;
 }
 
 /**
@@ -305,12 +292,11 @@
  * implementation must override this method but it can just
  * ignore calls if flushing doesn't align with its semantics.
  */
-bool KafkaWriter::DoFlush(double network_time)
-{
-    if (!mocking) {
-        producer->flush(0);
-    }
-    return true;
+bool KafkaWriter::DoFlush(double network_time) {
+  if (!mocking) {
+    producer->flush(0);
+  }
+  return true;
 }
 
 /**
@@ -322,31 +308,31 @@
  * FinishedRotation() to signal the log manager that potential
  * postprocessors can now run.
  */
-bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close, bool terminating)
-{
-    // no need to perform log rotation
-    return FinishedRotation();
+bool KafkaWriter::DoRotate(const char *rotated_path, double open, double close,
+                           bool terminating) {
+  // no need to perform log rotation
+  return FinishedRotation();
 }
 
 /**
  * Triggered by regular heartbeat messages from the main thread.
  */
-bool KafkaWriter::DoHeartbeat(double network_time, double current_time)
-{
-    if (!mocking) {
-        producer->poll(0);
-    }
-    return true;
+bool KafkaWriter::DoHeartbeat(double network_time, double current_time) {
+  if (!mocking) {
+    producer->poll(0);
+  }
+  return true;
 }
 
 /**
- * Triggered when the topic is resolved from the configuration, when mocking/testing
+ * Triggered when the topic is resolved from the configuration, when
+ * mocking/testing
  * @param topic
  */
-void KafkaWriter::raise_topic_resolved_event(const string topic) {
-    if (kafka_topic_resolved_event) {
-        val_list *vl = new val_list;
-        vl->append(new StringVal(topic.c_str()));
-        mgr.QueueEvent(kafka_topic_resolved_event, vl);
-    }
+void KafkaWriter::raise_topic_resolved_event(const std::string topic) {
+  if (kafka_topic_resolved_event) {
+    val_list *vl = new val_list;
+    vl->append(new StringVal(topic.c_str()));
+    mgr.QueueEvent(kafka_topic_resolved_event, vl);
+  }
 }
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index ad29ac7..ad5fc09 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -19,6 +19,7 @@
 #define ZEEK_PLUGIN_BRO_KAFKA_KAFKAWRITER_H
 
 #include <librdkafka/rdkafkacpp.h>
+#include <map>
 #include <string>
 #include <Desc.h>
 #include <logging/WriterBackend.h>
@@ -65,17 +66,17 @@
     virtual bool DoHeartbeat(double network_time, double current_time);
 
 private:
-    string GetConfigValue(const WriterInfo& info, const string name) const;
-    void raise_topic_resolved_event(const string topic);
-    static const string default_topic_key;
-    string stream_id;
+    std::string GetConfigValue(const WriterInfo& info, const std::string name) const;
+    void raise_topic_resolved_event(const std::string topic);
+    static const std::string default_topic_key;
+    std::string stream_id;
     bool tag_json;
     bool mocking;
-    string json_timestamps;
-    map<string, string> kafka_conf;
-    map<string, string> additional_message_values;
-    string topic_name;
-    string topic_name_override;
+    std::string json_timestamps;
+    std::map<std::string, std::string> kafka_conf;
+    std::map<std::string, std::string> additional_message_values;
+    std::string topic_name;
+    std::string topic_name_override;
     threading::formatter::Formatter *formatter;
     RdKafka::Producer* producer;
     RdKafka::Topic* topic;
diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc
index f182d95..071dc30 100644
--- a/src/TaggedJSON.cc
+++ b/src/TaggedJSON.cc
@@ -19,13 +19,13 @@
 
 namespace threading { namespace formatter {
 
-TaggedJSON::TaggedJSON(string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn)
+TaggedJSON::TaggedJSON(std::string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn)
 {}
 
 TaggedJSON::~TaggedJSON()
 {}
 
-bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, std::map<std::string,std::string> &const_vals) const
 {
     desc->AddRaw("{");
 
@@ -40,7 +40,7 @@
     JSON::Describe(desc, num_fields, fields, vals);
     if (const_vals.size() > 0) {
 
-      map<string, string>::iterator it = const_vals.begin();
+      std::map<std::string, std::string>::iterator it = const_vals.begin();
       while (it != const_vals.end()) {
         desc->AddRaw(",");
         desc->AddRaw("\"");
@@ -56,4 +56,5 @@
   desc->AddRaw("}");
     return true;
 }
-}}
+} // namespace formatter
+} // namespace threading
diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h
index 9135bf2..f8d3005 100644
--- a/src/TaggedJSON.h
+++ b/src/TaggedJSON.h
@@ -18,17 +18,19 @@
 #ifndef ZEEK_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
 #define ZEEK_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
 
-#include <string>
 #include <Desc.h>
+#include <map>
+#include <string>
 #include <threading/Formatter.h>
 #include <threading/formatters/JSON.h>
 
-using threading::formatter::JSON;
+using threading::Field;
 using threading::MsgThread;
 using threading::Value;
-using threading::Field;
+using threading::formatter::JSON;
 
-namespace threading { namespace formatter {
+namespace threading {
+namespace formatter {
 
 /*
  * A JSON formatter that prepends or 'tags' the content with a log stream
@@ -37,15 +39,16 @@
  *   { 'http' : { ... }}
  */
 class TaggedJSON : public JSON {
-
 public:
-    TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
-    virtual ~TaggedJSON();
-    virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const;
+  TaggedJSON(std::string stream_name, MsgThread *t, JSON::TimeFormat tf);
+  virtual ~TaggedJSON();
+  virtual bool Describe(ODesc *desc, int num_fields, const Field *const *fields,
+                        Value **vals, std::map<std::string, std::string> &const_vals) const;
 
 private:
-    string stream_name;
+  std::string stream_name;
 };
 
-}}
+} // namespace formatter
+} // namespace threading
 #endif