Merge pull request #847 from ahgittin/fix/kafka
partial fix for kafka 0.8.0-beta1 compatibility
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
index 96088f9..4aa86a9 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java
@@ -31,10 +31,12 @@
import brooklyn.util.ssh.CommonCommands;
import brooklyn.entity.drivers.downloads.DownloadResolver;
import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.entity.java.UsesJmx;
import brooklyn.location.basic.SshMachineLocation;
import brooklyn.util.NetworkUtils;
import brooklyn.util.ResourceUtils;
import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableMap.Builder;
import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
import com.google.common.collect.ImmutableList;
@@ -42,6 +44,7 @@
public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriver {
+ @SuppressWarnings("unused")
private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation machine) {
@@ -82,6 +85,10 @@
commands.add("cd "+expandedInstallDir);
commands.add("./sbt update");
commands.add("./sbt package");
+ if (isV08()) {
+ // target not known in v0.7.x but required in v0.8.0-beta1
+ commands.add("./sbt assembly-package-dependency");
+ }
newScript(INSTALLING)
.failOnNonZeroResultCode()
@@ -89,6 +96,12 @@
.execute();
}
+ protected boolean isV08() {
+ String v = getEntity().getConfig(Kafka.SUGGESTED_VERSION);
+ if (v.startsWith("0.7.")) return false;
+ return true;
+ }
+
@Override
public void customize() {
NetworkUtils.checkPortsValid(getPortMap());
@@ -100,14 +113,25 @@
String config = entity.getConfig(getConfigTemplateKey());
copyTemplate(config, getConfigFileName());
- // Copy JMX agent Jar to server
- getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
+ if (isJmxEnabled()) {
+ // Copy JMX agent Jar to server
+ getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath());
+ }
}
public String getJmxRmiAgentJarBasename() {
return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
}
+ // KAFKA requires a JMX port specified even if it is disabled in brooklyn (we just don't consume it)
+ @Override
+ public Integer getJmxPort() {
+ return entity.getAttribute(UsesJmx.JMX_PORT);
+ }
+ public Integer getRmiServerPort() {
+ return entity.getAttribute(UsesJmx.RMI_SERVER_PORT);
+ }
+
public String getJmxRmiAgentJarUrl() {
return "classpath://" + getJmxRmiAgentJarBasename();
}
@@ -141,18 +165,24 @@
@Override
protected Map<String, ?> getJmxJavaSystemProperties() {
- return MutableMap.<String, Object> builder()
- .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
+ Builder<String, Object> result = MutableMap.<String, Object> builder();
+ if (isJmxEnabled()) {
+ // TODO JMX SSL ENABLED -- see superclass behaviour
+ result.put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
.put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort())
.put("com.sun.management.jmxremote.ssl", false)
.put("com.sun.management.jmxremote.authenticate", false)
- .put("java.rmi.server.hostname", getHostname())
- .build();
+ .put("java.rmi.server.hostname", getHostname());
+ }
+ return result.build();
}
@Override
protected List<String> getJmxJavaConfigOptions() {
- return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+ if (isJmxEnabled())
+ return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath());
+ else
+ return ImmutableList.of();
}
/**
@@ -164,7 +194,7 @@
String kafkaJmxOpts = orig.remove("JAVA_OPTS");
return MutableMap.<String, String>builder()
.putAll(orig)
- .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
+ .putIfNotNull("KAFKA_JMX_OPTS", kafkaJmxOpts)
.build();
}
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
index c9caa03..2fd4fb0 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
@@ -15,9 +15,9 @@
*/
package brooklyn.entity.messaging.kafka;
-import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
-public interface KafkaBrokerDriver extends SoftwareProcessDriver {
+public interface KafkaBrokerDriver extends JavaSoftwareProcessDriver {
Integer getKafkaPort();
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index be330bd..99da3d6 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -80,12 +80,14 @@
public void waitForServiceUp(long duration, TimeUnit units) {
super.waitForServiceUp(duration, units);
- // Wait for the MBean to exist
- JmxHelper helper = new JmxHelper(this);
- try {
- helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
- } finally {
- helper.disconnect();
+ if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
+ // Wait for the MBean to exist
+ JmxHelper helper = new JmxHelper(this);
+ try {
+ helper.assertMBeanExistsEventually(SOCKET_SERVER_STATS_MBEAN, units.toMillis(duration));
+ } finally {
+ helper.disconnect();
+ }
}
}
@@ -93,7 +95,8 @@
protected void connectSensors() {
connectServiceUpIsRunning();
- jmxFeed = JmxFeed.builder()
+ if (((KafkaBrokerDriver)getDriver()).isJmxEnabled()) {
+ jmxFeed = JmxFeed.builder()
.entity(this)
.period(500, TimeUnit.MILLISECONDS)
.pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
@@ -129,6 +132,7 @@
.attributeName("TotalBytesWritten")
.onError(Functions.constant(-1l)))
.build();
+ }
setBrokerUrl();
}
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
index 7341e6f..4a987bd 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -20,7 +20,6 @@
import brooklyn.config.ConfigKey;
import brooklyn.location.basic.SshMachineLocation;
import brooklyn.util.collections.MutableMap;
-import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver {
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
index ac1c8fe..535f08e 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
@@ -15,9 +15,9 @@
*/
package brooklyn.entity.messaging.kafka;
-import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
-public interface KafkaZookeeperDriver extends SoftwareProcessDriver {
+public interface KafkaZookeeperDriver extends JavaSoftwareProcessDriver {
Integer getZookeeperPort();
diff --git a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
index 64dccaf..17755ae 100644
--- a/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/zookeeper/AbstractZookeeperImpl.java
@@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
import brooklyn.event.feed.jmx.JmxAttributePollConfig;
import brooklyn.event.feed.jmx.JmxFeed;
import brooklyn.event.feed.jmx.JmxHelper;
@@ -53,12 +54,14 @@
public void waitForServiceUp(long duration, TimeUnit units) {
super.waitForServiceUp(duration, units);
- // Wait for the MBean to exist
- JmxHelper helper = new JmxHelper(this);
- try {
- helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
- } finally {
- helper.disconnect();
+ if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+ // Wait for the MBean to exist
+ JmxHelper helper = new JmxHelper(this);
+ try {
+ helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
+ } finally {
+ helper.disconnect();
+ }
}
}
@@ -66,7 +69,8 @@
protected void connectSensors() {
connectServiceUpIsRunning();
- jmxFeed = JmxFeed.builder()
+ if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+ jmxFeed = JmxFeed.builder()
.entity(this)
.period(500, TimeUnit.MILLISECONDS)
.pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
@@ -82,6 +86,7 @@
.attributeName("PacketsSent")
.onError(Functions.constant(-1l)))
.build();
+ }
}
@Override
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
index b440076..2398937 100644
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
+++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
@@ -22,12 +22,18 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=${entity.brokerId?c}
+# 0.7 syntax above, 0.8 syntax below
+broker.id=${entity.brokerId?c}
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=${driver.hostname}
+# 0.7 syntax above, 0.8 syntax below
+host.name=${driver.hostname}
+# many of the settings below are for 0.7 only (but they are the default; i've updated the essential ones)
+# TODO should create a new kafka server.properties for 0.8
############################# Socket Server Settings #############################
@@ -115,6 +121,10 @@
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
+# 0.7 syntax above, 0.8 syntax below
+zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
+# 0.7 syntax above, 0.8 syntax below
+zookeeper.connection.timeout.ms=1000000