Sling 9504 (#1)
diff --git a/pom.xml b/pom.xml
index da1728e..36e23ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<!-- P R O J E C T -->
<!-- ======================================================================= -->
<artifactId>org.apache.sling.distribution.journal.it</artifactId>
- <version>0.1.3-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<name>Apache Sling Distribution Journal - IT project</name>
<description>
@@ -157,22 +157,23 @@
<artifactId>org.apache.sling.commons.metrics</artifactId>
<version>1.2.6</version>
</dependency>
+
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal</artifactId>
- <version>0.1.17-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
</dependency>
-
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.kafka</artifactId>
- <version>0.1.5-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
</dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
index 17e471e..f26f54f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -125,7 +125,6 @@
mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.metrics").version(SlingOptions.versionResolver),
mvn("org.apache.felix", "org.apache.felix.rootcause"),
mvn("org.apache.felix", "org.apache.felix.systemready"),
- mvn("com.google.protobuf", "protobuf-java"),
kafka(),
// The bundle built (org.apache.sling.distribution.journal)
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
index f57b35a..0ba9ea6 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
@@ -27,6 +27,7 @@
import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
import java.io.Closeable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -56,11 +57,11 @@
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.it.DistributionTestSupport;
import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
@@ -149,30 +150,30 @@
assertTrue(messageSem.tryAcquire(10, TimeUnit.SECONDS));
PackageMessage pkg = recordedPackage.get();
assertEquals(PackageMessage.ReqType.ADD, pkg.getReqType());
- String path = pkg.getPathsList().iterator().next();
+ String path = pkg.getPaths().iterator().next();
assertEquals("/", path);
}
private void simulateDiscoveryMessage(long offset) {
- MessageSender<DiscoveryMessage> discSender = clientProvider.createSender();
+ MessageSender<DiscoveryMessage> discSender = clientProvider.createSender(TOPIC_DISCOVERY);
DiscoveryMessage disc = createDiscoveryMessage(offset);
- discSender.send(TOPIC_DISCOVERY, disc);
+ discSender.accept(disc);
}
private DiscoveryMessage createDiscoveryMessage(long offset) {
- SubscriberState subState = SubscriberState.newBuilder()
- .setOffset(offset)
- .setPubAgentName(PUB1_AGENT)
+ SubscriberState subState = SubscriberState.builder()
+ .offset(offset)
+ .pubAgentName(PUB1_AGENT)
.build();
- return DiscoveryMessage.newBuilder()
- .setSubSlingId(SUB1_SLING_ID)
- .setSubAgentName(SUB1_AGENT)
- .setSubscriberConfiguration(SubscriberConfiguration
- .newBuilder()
- .setEditable(false)
- .setMaxRetries(-1)
+ return DiscoveryMessage.builder()
+ .subSlingId(SUB1_SLING_ID)
+ .subAgentName(SUB1_AGENT)
+ .subscriberConfiguration(SubscriberConfig
+ .builder()
+ .editable(false)
+ .maxRetries(-1)
.build())
- .addSubscriberState(subState)
+ .subscriberStates(Arrays.asList(subState))
.build();
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
index 8983a4b..7b578b5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
@@ -46,11 +46,11 @@
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.it.DistributionTestSupport;
import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
@@ -61,7 +61,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.ByteString;
/**
* Starts an author instance, triggers a content distribution and checks that the package arrives
@@ -111,14 +110,14 @@
log.info("Sending message {}", c);
}
PackageMessage packageMessage = createPackageMessage(c);
- clientProvider.createSender().send(TOPIC_PACKAGE, packageMessage);
+ clientProvider.createSender(TOPIC_PACKAGE).send(packageMessage);
}
try (Closeable packagePoller = createPoller()) {
messageSem.tryAcquire(NUM_MESSAGES, 100, TimeUnit.SECONDS);
}
await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.emptySet()));
DiscoveryMessage disc = createDiscoveryMessage(-1);
- clientProvider.createSender().send(TOPIC_DISCOVERY, disc);
+ clientProvider.createSender(TOPIC_DISCOVERY).send(disc);
await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME)));
log.info("Checking Items in queue");
@@ -136,19 +135,19 @@
}
private DiscoveryMessage createDiscoveryMessage(long offset) {
- SubscriberState subState = SubscriberState.newBuilder()
- .setOffset(offset)
- .setPubAgentName(PUB1_AGENT)
+ SubscriberState subState = SubscriberState.builder()
+ .offset(offset)
+ .pubAgentName(PUB1_AGENT)
.build();
- return DiscoveryMessage.newBuilder()
- .setSubSlingId(SUB1_SLING_ID)
- .setSubAgentName(SUB1_AGENT)
- .setSubscriberConfiguration(SubscriberConfiguration
- .newBuilder()
- .setEditable(false)
- .setMaxRetries(-1)
+ return DiscoveryMessage.builder()
+ .subSlingId(SUB1_SLING_ID)
+ .subAgentName(SUB1_AGENT)
+ .subscriberConfiguration(SubscriberConfig
+ .builder()
+ .editable(false)
+ .maxRetries(-1)
.build())
- .addSubscriberState(subState)
+ .subscriberStates(Collections.singletonList(subState))
.build();
}
@@ -158,14 +157,14 @@
}
private PackageMessage createPackageMessage(int num) throws IOException {
- return PackageMessage.newBuilder()
- .setPkgId("myid" + num)
- .setPubSlingId("pub1sling")
- .setPubAgentName(PUB1_AGENT)
- .setPkgType("journal")
- .setReqType(PackageMessage.ReqType.ADD)
- .addAllPaths(Arrays.asList("/test"))
- .setPkgBinary(ByteString.copyFrom(new byte[100]))
+ return PackageMessage.builder()
+ .pkgId("myid" + num)
+ .pubSlingId("pub1sling")
+ .pubAgentName(PUB1_AGENT)
+ .pkgType("journal")
+ .reqType(PackageMessage.ReqType.ADD)
+ .paths(Arrays.asList("/test"))
+ .pkgBinary(new byte[100])
.build();
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
index a918caa..e6ae073 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
@@ -38,8 +38,7 @@
import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
-
-import com.google.protobuf.ByteString;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.junit.Assert;
@@ -56,7 +55,6 @@
import org.ops4j.pax.exam.util.Filter;
import org.ops4j.pax.exam.util.PathUtils;
-import static org.apache.sling.distribution.journal.messages.Messages.*;
import static org.apache.commons.io.IOUtils.closeQuietly;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -247,9 +245,9 @@
private void sendInvalidPackages(int nb)
throws Exception {
- MessageSender<PackageMessage> sender = clientProvider.createSender();
+ MessageSender<PackageMessage> sender = clientProvider.createSender(TOPIC_PACKAGE);
for (int i = 0 ; i < nb ; i++) {
- sender.send(TOPIC_PACKAGE, newInvalidPackage(PUB1_AGENT));
+ sender.send(newInvalidPackage(PUB1_AGENT));
}
}
@@ -261,16 +259,16 @@
final List<String> deepPaths = Collections.emptyList();
final String pkgId = String.format("package-%s", UUID.randomUUID().toString());
- return PackageMessage.newBuilder()
- .setPubSlingId("slingid")
- .setPkgId(pkgId)
- .setPubAgentName(agentId)
- .setPkgBinary(ByteString.copyFrom(pkgBinary))
- .setPkgType("journal")
- .addAllPaths(paths)
- .setReqType(PackageMessage.ReqType.ADD)
- .addAllDeepPaths(deepPaths)
- .setPkgLength(pkgBinary.length)
+ return PackageMessage.builder()
+ .pubSlingId("slingid")
+ .pkgId(pkgId)
+ .pubAgentName(agentId)
+ .pkgBinary(pkgBinary)
+ .pkgType("journal")
+ .paths(paths)
+ .reqType(PackageMessage.ReqType.ADD)
+ .deepPaths(deepPaths)
+ .pkgLength(pkgBinary.length)
.build();
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
index a013ed5..e1930c9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
@@ -46,8 +46,7 @@
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.it.DistributionTestSupport;
import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -62,8 +61,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.ByteString;
-
/**
* Starts a publish instance and checks that it can receive and process a PackageMessage from the journal
*/
@@ -115,8 +112,8 @@
Arrays.asList(bundleContext.getBundles()).stream()
.forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion()));
DistributionPackage pkg = createDistPackage(RESOURCE_PATH);
- Messages.PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
- provider.createSender().send(TOPIC_PACKAGE, pkgMsg);
+ PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
+ provider.createSender(TOPIC_PACKAGE).send(pkgMsg);
await().until(() -> getResource(RESOURCE_PATH), notNullValue());
}
@@ -159,16 +156,16 @@
final List<String> deepPaths = Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class));
final String pkgId = pkg.getId();
- return PackageMessage.newBuilder()
- .setPubSlingId("slingid")
- .setPkgId(pkgId)
- .setPubAgentName(agentId)
- .setPkgBinary(ByteString.copyFrom(pkgBinary))
- .setPkgType(pkg.getType())
- .addAllPaths(paths)
- .setReqType(PackageMessage.ReqType.ADD)
- .addAllDeepPaths(deepPaths)
- .setPkgLength(pkgBinary.length)
+ return PackageMessage.builder()
+ .pubSlingId("slingid")
+ .pkgId(pkgId)
+ .pubAgentName(agentId)
+ .pkgBinary(pkgBinary)
+ .pkgType(pkg.getType())
+ .paths(paths)
+ .reqType(PackageMessage.ReqType.ADD)
+ .deepPaths(deepPaths)
+ .pkgLength(pkgBinary.length)
.build();
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
index 3dcb413..1513e0b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
@@ -63,9 +63,9 @@
import org.apache.sling.distribution.journal.it.DistributionTestSupport;
import org.apache.sling.distribution.journal.it.FileUtil;
import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -254,7 +254,7 @@
}
private void handleDiscovery(MessageInfo info, DiscoveryMessage message) {
- List<SubscriberState> stateList = message.getSubscriberStateList();
+ List<SubscriberState> stateList = message.getSubscriberStates();
String slingId = message.getSubSlingId();
OptionalLong minOffset = stateList.stream().mapToLong(state -> state.getOffset()).min();
LOG.info("DiscoveryMessage slingid {} received {} states {}", slingId, minOffset, stateList);
@@ -266,7 +266,7 @@
if (message.getReqType() == PackageMessage.ReqType.TEST) {
return;
}
- LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPathsList());
+ LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPaths());
this.lastPackageOffset = info.getOffset();
packageReceived.release();
}