Merge pull request #847 from ahgittin/fix/kafka

partial fix for kafka 0.8.0-beta1 compatibility
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
index 96088f9..4aa86a9 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -31,10 +31,12 @@
 import brooklyn.util.ssh.CommonCommands;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
 import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.entity.java.UsesJmx;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.NetworkUtils;
 import brooklyn.util.ResourceUtils;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableMap.Builder;
 import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
 
 import com.google.common.collect.ImmutableList;
@@ -42,6 +44,7 @@
 
 public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriver {
 
+    @SuppressWarnings("unused")
     private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
 
     public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation machine) {
@@ -82,6 +85,10 @@
         commands.add("cd "+expandedInstallDir);
         commands.add("./sbt update");
         commands.add("./sbt package");
+        if (isV08()) {
+            // target not known in v0.7.x but required in v0.8.0-beta1
+            commands.add("./sbt assembly-package-dependency");
+        }
 
         newScript(INSTALLING)
                 .failOnNonZeroResultCode()
@@ -89,6 +96,12 @@
                 .execute();
     }
 
+    protected boolean isV08() {
+        String v = getEntity().getConfig(Kafka.SUGGESTED_VERSION);
+        if (v.startsWith("0.7.")) return false;
+        return true;
+    }
+    
     @Override
     public void customize() {
         NetworkUtils.checkPortsValid(getPortMap());
@@ -100,14 +113,25 @@
         String config = entity.getConfig(getConfigTemplateKey());
         copyTemplate(config, getConfigFileName());
 
-        // Copy JMX agent Jar to server
-        getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
+        if (isJmxEnabled()) {
+            // Copy JMX agent Jar to server
+            getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
+        }
     }
 
     public String getJmxRmiAgentJarBasename() {
         return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
     }
 
+    // KAFKA requires a JMX port specified even if it is disabled in brooklyn (we just don't consume it)
+    @Override
+    public Integer getJmxPort() {
+        return entity.getAttribute(UsesJmx.JMX_PORT);
+    }
+    public Integer getRmiServerPort() {
+        return entity.getAttribute(UsesJmx.RMI_SERVER_PORT);
+    }
+
     public String getJmxRmiAgentJarUrl() {
         return "classpath://" + getJmxRmiAgentJarBasename();
     }
@@ -141,18 +165,24 @@
 
     @Override
     protected Map<String, ?> getJmxJavaSystemProperties() {
-        return MutableMap.<String, Object> builder()
-                .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
+        Builder<String, Object> result = MutableMap.<String, Object> builder();
+        if (isJmxEnabled()) {
+            // TODO JMX SSL ENABLED -- see superclass behaviour
+            result.put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
                 .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
                 .put("com.sun.management.jmxremote.ssl", false)
                 .put("com.sun.management.jmxremote.authenticate", false)
-                .put("java.rmi.server.hostname", getHostname())
-                .build();
+                .put("java.rmi.server.hostname", getHostname());
+        }
+        return result.build();
     }
 
     @Override
     protected List<String> getJmxJavaConfigOptions() {
-        return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+        if (isJmxEnabled())
+            return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+        else
+            return ImmutableList.of();
     }
 
     /**
@@ -164,7 +194,7 @@
         String kafkaJmxOpts = orig.remove("JAVA_OPTS");
         return MutableMap.<String, String>builder()
                 .putAll(orig)
-                .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
+                .putIfNotNull("KAFKA_JMX_OPTS", kafkaJmxOpts)
                 .build();
     }
 
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
index c9caa03..2fd4fb0 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
@@ -15,9 +15,9 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
 
-public interface KafkaBrokerDriver extends SoftwareProcessDriver {
+public interface KafkaBrokerDriver extends JavaSoftwareProcessDriver {
 
     Integer getKafkaPort();
 
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index be330bd..99da3d6 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -80,12 +80,14 @@
     public void waitForServiceUp(long duration, TimeUnit units) {
         super.waitForServiceUp(duration, units);
 
-        // Wait for the MBean to exist
-        JmxHelper helper = new JmxHelper(this);
-        try {
-            helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
-        } finally {
-            helper.disconnect();
+        if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
+            // Wait for the MBean to exist
+            JmxHelper helper = new JmxHelper(this);
+            try {
+                helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
+            } finally {
+                helper.disconnect();
+            }
         }
     }
 
@@ -93,7 +95,8 @@
     protected void connectSensors() {
         connectServiceUpIsRunning();
 
-        jmxFeed = JmxFeed.builder()
+        if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
+            jmxFeed = JmxFeed.builder()
                 .entity(this)
                 .period(500, TimeUnit.MILLISECONDS)
                 .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
@@ -129,6 +132,7 @@
                         .attributeName("TotalBytesWritten")
                         .onError(Functions.constant(-1l)))
                 .build();
+        }
 
         setBrokerUrl();
     }
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index 7341e6f..4a987bd 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -20,7 +20,6 @@
 import brooklyn.config.ConfigKey;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.collections.MutableMap;
-import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
 
 public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver {
 
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
index ac1c8fe..535f08e 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
@@ -15,9 +15,9 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
 
-public interface KafkaZookeeperDriver extends SoftwareProcessDriver {
+public interface KafkaZookeeperDriver extends JavaSoftwareProcessDriver {
 
     Integer getZookeeperPort();
 
diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
index 64dccaf..17755ae 100644
--- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
@@ -23,6 +23,7 @@
 import org.slf4j.LoggerFactory;
 
 import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
 import brooklyn.event.feed.jmx.JmxAttributePollConfig;
 import brooklyn.event.feed.jmx.JmxFeed;
 import brooklyn.event.feed.jmx.JmxHelper;
@@ -53,12 +54,14 @@
     public void waitForServiceUp(long duration, TimeUnit units) {
         super.waitForServiceUp(duration, units);
 
-        // Wait for the MBean to exist
-        JmxHelper helper = new JmxHelper(this);
-        try {
-            helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
-        } finally {
-            helper.disconnect();
+        if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+            // Wait for the MBean to exist
+            JmxHelper helper = new JmxHelper(this);
+            try {
+                helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
+            } finally {
+                helper.disconnect();
+            }
         }
     }
 
@@ -66,7 +69,8 @@
     protected void connectSensors() {
         connectServiceUpIsRunning();
 
-        jmxFeed = JmxFeed.builder()
+        if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+            jmxFeed = JmxFeed.builder()
                 .entity(this)
                 .period(500, TimeUnit.MILLISECONDS)
                 .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
@@ -82,6 +86,7 @@
                         .attributeName("PacketsSent")
                         .onError(Functions.constant(-1l)))
                 .build();
+        }
     }
 
     @Override
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
index b440076..2398937 100644
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
+++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
@@ -22,12 +22,18 @@
 ############################# Server Basics #############################
 # The id of the broker. This must be set to a unique integer for each broker.
 brokerid=${entity.brokerId?c}
+# 0.7 syntax above, 0.8 syntax below
+broker.id=${entity.brokerId?c}
 
 # Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
 # from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
 # may not be what you want.
 hostname=${driver.hostname}
+# 0.7 syntax above, 0.8 syntax below
+host.name=${driver.hostname}
 
+# many of the settings below are for 0.7 only (but they are the default; i've updated the essential ones)
+# TODO should create a new kafka server.properties for 0.8 
 
 ############################# Socket Server Settings #############################
 
@@ -115,6 +121,10 @@
 # You can also append an optional chroot string to the urls to specify the
 # root directory for all kafka znodes.
 zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
+# 0.7 syntax above, 0.8 syntax below
+zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
 
 # Timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000
+# 0.7 syntax above, 0.8 syntax below
+zookeeper.connection.timeout.ms=1000000