Sling 9504 (#1)

diff --git a/pom.xml b/pom.xml
index da1728e..36e23ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
     <!-- P R O J E C T                                                           -->
     <!-- ======================================================================= -->
     <artifactId>org.apache.sling.distribution.journal.it</artifactId>
-    <version>0.1.3-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
 
     <name>Apache Sling Distribution Journal - IT project</name>
     <description>
@@ -157,22 +157,23 @@
             <artifactId>org.apache.sling.commons.metrics</artifactId>
             <version>1.2.6</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.1.9-SNAPSHOT</version>
+            <version>0.2.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal</artifactId>
-            <version>0.1.17-SNAPSHOT</version>
+            <version>0.2.0-SNAPSHOT</version>
         </dependency>
-        
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal.kafka</artifactId>
-            <version>0.1.5-SNAPSHOT</version>
+            <version>0.2.0-SNAPSHOT</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.12</artifactId>
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
index 17e471e..f26f54f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -125,7 +125,6 @@
                 mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.metrics").version(SlingOptions.versionResolver),
                 mvn("org.apache.felix", "org.apache.felix.rootcause"),
                 mvn("org.apache.felix", "org.apache.felix.systemready"),
-                mvn("com.google.protobuf", "protobuf-java"),
                 kafka(),
 
                 // The bundle built (org.apache.sling.distribution.journal)
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
index f57b35a..0ba9ea6 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
@@ -27,6 +27,7 @@
 import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
 
 import java.io.Closeable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -56,11 +57,11 @@
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.it.DistributionTestSupport;
 import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
@@ -149,30 +150,30 @@
         assertTrue(messageSem.tryAcquire(10, TimeUnit.SECONDS));
         PackageMessage pkg = recordedPackage.get();
         assertEquals(PackageMessage.ReqType.ADD, pkg.getReqType());
-        String path = pkg.getPathsList().iterator().next();
+        String path = pkg.getPaths().iterator().next();
         assertEquals("/", path);
     }
 
     private void simulateDiscoveryMessage(long offset) {
-        MessageSender<DiscoveryMessage> discSender = clientProvider.createSender();
+        MessageSender<DiscoveryMessage> discSender = clientProvider.createSender(TOPIC_DISCOVERY);
         DiscoveryMessage disc = createDiscoveryMessage(offset);
-        discSender.send(TOPIC_DISCOVERY, disc);
+        discSender.accept(disc);
     }
 
     private DiscoveryMessage createDiscoveryMessage(long offset) {
-        SubscriberState subState = SubscriberState.newBuilder()
-                .setOffset(offset)
-                .setPubAgentName(PUB1_AGENT)
+        SubscriberState subState = SubscriberState.builder()
+                .offset(offset)
+                .pubAgentName(PUB1_AGENT)
                 .build();
-        return DiscoveryMessage.newBuilder()
-                .setSubSlingId(SUB1_SLING_ID)
-                .setSubAgentName(SUB1_AGENT)
-                .setSubscriberConfiguration(SubscriberConfiguration
-                        .newBuilder()
-                        .setEditable(false)
-                        .setMaxRetries(-1)
+        return DiscoveryMessage.builder()
+                .subSlingId(SUB1_SLING_ID)
+                .subAgentName(SUB1_AGENT)
+                .subscriberConfiguration(SubscriberConfig
+                        .builder()
+                        .editable(false)
+                        .maxRetries(-1)
                         .build())
-                .addSubscriberState(subState)
+                .subscriberStates(Arrays.asList(subState))
                 .build();
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
index 8983a4b..7b578b5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
@@ -46,11 +46,11 @@
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.it.DistributionTestSupport;
 import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
@@ -61,7 +61,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.ByteString;
 
 /**
  * Starts an author instance, triggers a content distribution and checks that the package arrives
@@ -111,14 +110,14 @@
                 log.info("Sending message {}", c);
             }
             PackageMessage packageMessage = createPackageMessage(c);
-            clientProvider.createSender().send(TOPIC_PACKAGE, packageMessage);
+            clientProvider.createSender(TOPIC_PACKAGE).send(packageMessage);
         }
         try (Closeable packagePoller = createPoller()) {
             messageSem.tryAcquire(NUM_MESSAGES, 100, TimeUnit.SECONDS);
         }
         await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.emptySet()));
         DiscoveryMessage disc = createDiscoveryMessage(-1);
-        clientProvider.createSender().send(TOPIC_DISCOVERY, disc);
+        clientProvider.createSender(TOPIC_DISCOVERY).send(disc);
         await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME)));
         
         log.info("Checking Items in queue");
@@ -136,19 +135,19 @@
     }
 
     private DiscoveryMessage createDiscoveryMessage(long offset) {
-        SubscriberState subState = SubscriberState.newBuilder()
-                .setOffset(offset)
-                .setPubAgentName(PUB1_AGENT)
+        SubscriberState subState = SubscriberState.builder()
+                .offset(offset)
+                .pubAgentName(PUB1_AGENT)
                 .build();
-        return DiscoveryMessage.newBuilder()
-                .setSubSlingId(SUB1_SLING_ID)
-                .setSubAgentName(SUB1_AGENT)
-                .setSubscriberConfiguration(SubscriberConfiguration
-                        .newBuilder()
-                        .setEditable(false)
-                        .setMaxRetries(-1)
+        return DiscoveryMessage.builder()
+                .subSlingId(SUB1_SLING_ID)
+                .subAgentName(SUB1_AGENT)
+                .subscriberConfiguration(SubscriberConfig
+                        .builder()
+                        .editable(false)
+                        .maxRetries(-1)
                         .build())
-                .addSubscriberState(subState)
+                .subscriberStates(Collections.singletonList(subState))
                 .build();
     }
 
@@ -158,14 +157,14 @@
     }
     
     private PackageMessage createPackageMessage(int num) throws IOException {
-        return PackageMessage.newBuilder()
-                .setPkgId("myid" + num)
-                .setPubSlingId("pub1sling")
-                .setPubAgentName(PUB1_AGENT)
-                .setPkgType("journal")
-                .setReqType(PackageMessage.ReqType.ADD)
-                .addAllPaths(Arrays.asList("/test"))
-                .setPkgBinary(ByteString.copyFrom(new byte[100]))
+        return PackageMessage.builder()
+                .pkgId("myid" + num)
+                .pubSlingId("pub1sling")
+                .pubAgentName(PUB1_AGENT)
+                .pkgType("journal")
+                .reqType(PackageMessage.ReqType.ADD)
+                .paths(Arrays.asList("/test"))
+                .pkgBinary(new byte[100])
                 .build();
     }
     
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
index a918caa..e6ae073 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
@@ -38,8 +38,7 @@
 import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
 import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
 import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
-
-import com.google.protobuf.ByteString;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.junit.Assert;
@@ -56,7 +55,6 @@
 import org.ops4j.pax.exam.util.Filter;
 import org.ops4j.pax.exam.util.PathUtils;
 
-import static org.apache.sling.distribution.journal.messages.Messages.*;
 import static org.apache.commons.io.IOUtils.closeQuietly;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -247,9 +245,9 @@
 
     private void sendInvalidPackages(int nb)
             throws Exception {
-        MessageSender<PackageMessage> sender = clientProvider.createSender();
+        MessageSender<PackageMessage> sender = clientProvider.createSender(TOPIC_PACKAGE);
         for (int i = 0 ; i < nb ; i++) {
-            sender.send(TOPIC_PACKAGE, newInvalidPackage(PUB1_AGENT));
+            sender.send(newInvalidPackage(PUB1_AGENT));
         }
     }
 
@@ -261,16 +259,16 @@
         final List<String> deepPaths = Collections.emptyList();
         final String pkgId = String.format("package-%s", UUID.randomUUID().toString());
 
-        return PackageMessage.newBuilder()
-                .setPubSlingId("slingid")
-                .setPkgId(pkgId)
-                .setPubAgentName(agentId)
-                .setPkgBinary(ByteString.copyFrom(pkgBinary))
-                .setPkgType("journal")
-                .addAllPaths(paths)
-                .setReqType(PackageMessage.ReqType.ADD)
-                .addAllDeepPaths(deepPaths)
-                .setPkgLength(pkgBinary.length)
+        return PackageMessage.builder()
+                .pubSlingId("slingid")
+                .pkgId(pkgId)
+                .pubAgentName(agentId)
+                .pkgBinary(pkgBinary)
+                .pkgType("journal")
+                .paths(paths)
+                .reqType(PackageMessage.ReqType.ADD)
+                .deepPaths(deepPaths)
+                .pkgLength(pkgBinary.length)
                 .build();
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
index a013ed5..e1930c9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
@@ -46,8 +46,7 @@
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.it.DistributionTestSupport;
 import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -62,8 +61,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.ByteString;
-
 /**
  * Starts a publish instance and checks that it can receive and process a PackageMessage from the journal
  */
@@ -115,8 +112,8 @@
     	Arrays.asList(bundleContext.getBundles()).stream()
     	.forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion()));
         DistributionPackage pkg = createDistPackage(RESOURCE_PATH);
-        Messages.PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
-        provider.createSender().send(TOPIC_PACKAGE, pkgMsg);
+        PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
+        provider.createSender(TOPIC_PACKAGE).send(pkgMsg);
         await().until(() -> getResource(RESOURCE_PATH), notNullValue());
     }
 
@@ -159,16 +156,16 @@
         final List<String> deepPaths = Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class));
         final String pkgId = pkg.getId();
 
-        return PackageMessage.newBuilder()
-                .setPubSlingId("slingid")
-                .setPkgId(pkgId)
-                .setPubAgentName(agentId)
-                .setPkgBinary(ByteString.copyFrom(pkgBinary))
-                .setPkgType(pkg.getType())
-                .addAllPaths(paths)
-                .setReqType(PackageMessage.ReqType.ADD)
-                .addAllDeepPaths(deepPaths)
-                .setPkgLength(pkgBinary.length)
+        return PackageMessage.builder()
+                .pubSlingId("slingid")
+                .pkgId(pkgId)
+                .pubAgentName(agentId)
+                .pkgBinary(pkgBinary)
+                .pkgType(pkg.getType())
+                .paths(paths)
+                .reqType(PackageMessage.ReqType.ADD)
+                .deepPaths(deepPaths)
+                .pkgLength(pkgBinary.length)
                 .build();
     }
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
index 3dcb413..1513e0b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
@@ -63,9 +63,9 @@
 import org.apache.sling.distribution.journal.it.DistributionTestSupport;
 import org.apache.sling.distribution.journal.it.FileUtil;
 import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -254,7 +254,7 @@
     }
     
     private void handleDiscovery(MessageInfo info, DiscoveryMessage message) {
-        List<SubscriberState> stateList = message.getSubscriberStateList();
+        List<SubscriberState> stateList = message.getSubscriberStates();
         String slingId = message.getSubSlingId();
         OptionalLong minOffset = stateList.stream().mapToLong(state -> state.getOffset()).min();
         LOG.info("DiscoveryMessage slingid {} received {} states {}", slingId, minOffset, stateList);
@@ -266,7 +266,7 @@
         if (message.getReqType() == PackageMessage.ReqType.TEST) {
             return;
         }
-        LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPathsList());
+        LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPaths());
         this.lastPackageOffset = info.getOffset();
         packageReceived.release();
     }