Get a set of messages by id.
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java
new file mode 100644
index 0000000..48c2fae
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.binders;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.jclouds.http.HttpRequest;
+import org.jclouds.rest.Binder;
+
+import javax.inject.Singleton;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @author Everett Toews
+ */
+@Singleton
+public class BindIdsToQueryParam implements Binder {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R extends HttpRequest> R bindToRequest(R request, Object input) {
+ checkArgument(input instanceof Iterable<?>, "This binder is only valid for Iterable");
+ Iterable<String> ids = (Iterable<String>) checkNotNull(input, "Iterable of Strings");
+ checkArgument(Iterables.size(ids) > 0, "You must specify at least one id");
+
+ return (R) request.toBuilder().replaceQueryParam("ids", Joiner.on(',').join(ids)).build();
+ }
+}
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
index 84b67b5..4a77a1b 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
@@ -21,7 +21,7 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import org.jclouds.openstack.marconi.v1.options.StreamOptions;
+import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions;
import org.jclouds.openstack.v2_0.domain.Link;
import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
@@ -37,8 +37,8 @@
*
* @return The options necessary to get the next page of messages.
*/
- public StreamOptions nextStreamOptions() {
- return StreamOptions.class.cast(nextMarker().get());
+ public StreamMessagesOptions nextStreamOptions() {
+ return StreamMessagesOptions.class.cast(nextMarker().get());
}
@Override
@@ -58,7 +58,7 @@
@Override
public Object apply(Link link) {
Multimap<String, String> queryParams = queryParser().apply(link.getHref().getRawQuery());
- StreamOptions paginationOptions = StreamOptions.Builder.queryParameters(queryParams);
+ StreamMessagesOptions paginationOptions = StreamMessagesOptions.Builder.queryParameters(queryParams);
return paginationOptions;
}
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
index 6ef6a93..d5f3d74 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
@@ -23,7 +23,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.jclouds.openstack.marconi.v1.options.ListQueuesOptions;
-import org.jclouds.openstack.marconi.v1.options.StreamOptions;
import org.jclouds.openstack.v2_0.domain.Link;
import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
index ecfb2c5..9b9fcb8 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
@@ -17,14 +17,16 @@
package org.jclouds.openstack.marconi.v1.features;
import org.jclouds.Fallbacks;
-import org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks;
import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
+import org.jclouds.openstack.marconi.v1.binders.BindIdsToQueryParam;
import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
-import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.domain.Message;
import org.jclouds.openstack.marconi.v1.domain.MessageStream;
-import org.jclouds.openstack.marconi.v1.functions.ParseMessages;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream;
import org.jclouds.openstack.marconi.v1.functions.ParseMessagesCreated;
-import org.jclouds.openstack.marconi.v1.options.StreamOptions;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList;
+import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions;
import org.jclouds.rest.annotations.BinderParam;
import org.jclouds.rest.annotations.Fallback;
import org.jclouds.rest.annotations.RequestFilters;
@@ -38,10 +40,14 @@
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.UUID;
+import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404;
+import static org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404;
+
/**
* Provides access to Messages via their REST API.
*
@@ -69,12 +75,36 @@
MessagesCreated create(@HeaderParam("Client-ID") UUID clientId,
@BinderParam(BindToJsonPayload.class) List<CreateMessage> messages);
+ /**
+ * Streams the messages off of a queue. In a very active queue it's possible that you could continuously stream
+ * messages indefinitely.
+ *
+ * @param clientId A UUID for each client instance.
+ * @param options Options for streaming messages to your client.
+ */
@Named("message:stream")
@GET
- @ResponseParser(ParseMessages.class)
+ @ResponseParser(ParseMessagesToStream.class)
@Consumes(MediaType.APPLICATION_JSON)
- @Fallback(KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404.class)
+ @Fallback(EmptyPaginatedCollectionOnNotFoundOr404.class)
@Path("/messages")
MessageStream stream(@HeaderParam("Client-ID") UUID clientId,
- StreamOptions... options);
+ StreamMessagesOptions... options);
+
+ /**
+ * List specific messages. Unlike the stream method, a client's own messages are always returned in this operation.
+ *
+ * @param clientId A UUID for each client instance.
+ * @param ids Specifies the IDs of the messages to get.
+ */
+ @Named("message:list")
+ @GET
+ @ResponseParser(ParseMessagesToList.class)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/messages")
+ @Fallback(EmptyListOnNotFoundOr404.class)
+ List<Message> list(@HeaderParam("Client-ID") UUID clientId,
+ @BinderParam(BindIdsToQueryParam.class) Iterable<String> ids);
+
+ // TODO: list by claim id when claim API done
}
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
index a2fc044..c12bbf3 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
@@ -110,7 +110,7 @@
/**
* Use this method to manually page through the list of queues.
*/
- @Named("record:list")
+ @Named("queue:list")
@GET
@ResponseParser(ParseQueues.class)
@Consumes(MediaType.APPLICATION_JSON)
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
index d2d4d83..9be3722 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
@@ -26,7 +26,7 @@
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
/**
* This parses the messages created on a queue.
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java
new file mode 100644
index 0000000..d7d0b77
--- /dev/null
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+
+import javax.inject.Inject;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE;
+
+/**
+ * @author Everett Toews
+ */
+public class ParseMessagesToList implements Function<HttpResponse, List<Message>> {
+
+ private final ParseJson<List<MessageWithHref>> json;
+
+ @Inject
+ ParseMessagesToList(ParseJson<List<MessageWithHref>> json) {
+ this.json = checkNotNull(json, "json");
+ }
+
+ @Override
+ public List<Message> apply(HttpResponse response) {
+ // An empty message stream has a 204 response code
+ if (response.getStatusCode() == 204) {
+ return ImmutableList.of();
+ }
+
+ List<MessageWithHref> messagesWithHref = json.apply(response);
+ return Lists.newArrayList(transform(messagesWithHref, TO_MESSAGE));
+ }
+}
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
similarity index 92%
rename from openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
rename to openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
index 163483d..76a9ba7 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
@@ -19,7 +19,6 @@
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import org.jclouds.http.HttpResponse;
import org.jclouds.http.functions.ParseJson;
import org.jclouds.openstack.marconi.v1.domain.Message;
@@ -29,19 +28,18 @@
import javax.inject.Inject;
import java.beans.ConstructorProperties;
-import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* @author Everett Toews
*/
-public class ParseMessages implements Function<HttpResponse, MessageStream> {
+public class ParseMessagesToStream implements Function<HttpResponse, MessageStream> {
private final ParseJson<MessagesWithHref> json;
@Inject
- ParseMessages(ParseJson<MessagesWithHref> json) {
+ ParseMessagesToStream(ParseJson<MessagesWithHref> json) {
this.json = checkNotNull(json, "json");
}
@@ -93,7 +91,7 @@
}
}
- private static class MessageWithHref extends Message {
+ protected static class MessageWithHref extends Message {
@ConstructorProperties({ "href", "ttl", "body", "age" })
protected MessageWithHref(String href, int ttl, String body, int age) {
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
index fb87c6c..e6f0ee2 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
@@ -25,7 +25,7 @@
import org.jclouds.openstack.marconi.v1.domain.QueueStats;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
+import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
/**
* This parses the stats of a queue.
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
index 1ce60d4..929529d 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
@@ -67,7 +67,7 @@
}
/**
- * @return The String representation of the marker for these StreamOptions.
+ * @return The String representation of the marker for these StreamMessagesOptions.
*/
public String getMarker() {
return Iterables.getOnlyElement(queryParameters.get("marker"));
diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
similarity index 78%
rename from openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
rename to openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
index be2fffb..b0ff396 100644
--- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
+++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
@@ -25,15 +25,15 @@
/**
* Options used to control the messages returned in the response.
*/
-public class StreamOptions extends PaginationOptions {
+public class StreamMessagesOptions extends PaginationOptions {
- public static final StreamOptions NONE = new StreamOptions();
+ public static final StreamMessagesOptions NONE = new StreamMessagesOptions();
/**
* {@inheritDoc}
*/
@Override
- public StreamOptions queryParameters(Multimap<String, String> queryParams) {
+ public StreamMessagesOptions queryParameters(Multimap<String, String> queryParams) {
checkNotNull(queryParams, "queryParams");
queryParameters.putAll(queryParams);
return this;
@@ -43,7 +43,7 @@
* @see Builder#marker(String)
*/
@Override
- public StreamOptions marker(String marker) {
+ public StreamMessagesOptions marker(String marker) {
super.marker(marker);
return this;
}
@@ -52,7 +52,7 @@
* @see Builder#limit(int)
*/
@Override
- public StreamOptions limit(int limit) {
+ public StreamMessagesOptions limit(int limit) {
super.limit(limit);
return this;
@@ -61,13 +61,13 @@
/**
* @see Builder#echo(boolean)
*/
- public StreamOptions echo(boolean echo) {
+ public StreamMessagesOptions echo(boolean echo) {
queryParameters.put("echo", Boolean.toString(echo));
return this;
}
/**
- * @return The String representation of the marker for these StreamOptions.
+ * @return The String representation of the marker for these StreamMessagesOptions.
*/
public String getMarker() {
return Iterables.getOnlyElement(queryParameters.get("marker"));
@@ -77,8 +77,8 @@
/**
* @see PaginationOptions#queryParameters(Multimap)
*/
- public static StreamOptions queryParameters(Multimap<String, String> queryParams) {
- StreamOptions options = new StreamOptions();
+ public static StreamMessagesOptions queryParameters(Multimap<String, String> queryParams) {
+ StreamMessagesOptions options = new StreamMessagesOptions();
return options.queryParameters(queryParams);
}
@@ -90,8 +90,8 @@
* Clients should make no assumptions about the format or length of the marker. Furthermore, clients should assume
* that there is no relationship between markers and message IDs.
*/
- public static StreamOptions marker(String marker) {
- StreamOptions options = new StreamOptions();
+ public static StreamMessagesOptions marker(String marker) {
+ StreamMessagesOptions options = new StreamMessagesOptions();
return options.marker(marker);
}
@@ -101,8 +101,8 @@
* MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the default value) to return. If you do not
* specify a value for the limit parameter, the default value of 10 is used.
*/
- public static StreamOptions limit(int limit) {
- StreamOptions options = new StreamOptions();
+ public static StreamMessagesOptions limit(int limit) {
+ StreamMessagesOptions options = new StreamMessagesOptions();
return options.limit(limit);
}
@@ -111,8 +111,8 @@
* (UUID) portion of the client. If you do not specify a value, echo uses the default value of false. If you are
* experimenting with the API, you might want to set echo=true in order to see the messages that you posted.
*/
- public static StreamOptions echo(boolean echo) {
- StreamOptions options = new StreamOptions();
+ public static StreamMessagesOptions echo(boolean echo) {
+ StreamMessagesOptions options = new StreamMessagesOptions();
return options.echo(echo);
}
}
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
index 3bfe3e0..279ba20 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
@@ -18,16 +18,20 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.Message;
import org.jclouds.openstack.marconi.v1.domain.MessageStream;
import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest;
import org.testng.annotations.Test;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import static org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.echo;
+import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.echo;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -36,6 +40,8 @@
@Test(groups = "live", testName = "MessageApiLiveTest", singleThreaded = true)
public class MessageApiLiveTest extends BaseMarconiApiLiveTest {
+ private final Map<String, List<String>> messageIds = Maps.newHashMap();
+
public void createQueues() throws Exception {
for (String zoneId : api.getConfiguredZones()) {
QueueApi queueApi = api.getQueueApiForZone(zoneId);
@@ -49,7 +55,6 @@
public void streamZeroPagesOfMessages() throws Exception {
for (String zoneId : api.getConfiguredZones()) {
MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
-
UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
MessageStream messageStream = messageApi.stream(clientId, echo(true));
@@ -80,7 +85,6 @@
public void streamOnePageOfMessages() throws Exception {
for (String zoneId : api.getConfiguredZones()) {
MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
-
UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
MessageStream messageStream = messageApi.stream(clientId, echo(true));
@@ -120,14 +124,18 @@
public void streamManyPagesOfMessages() throws Exception {
for (String zoneId : api.getConfiguredZones()) {
MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
-
UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+ messageIds.put(zoneId, new ArrayList<String>());
MessageStream messageStream = messageApi.stream(clientId, echo(true).limit(2));
while(messageStream.nextMarker().isPresent()) {
assertEquals(Iterables.size(messageStream), 2);
+ for (Message message: messageStream) {
+ messageIds.get(zoneId).add(message.getId());
+ }
+
messageStream = messageApi.stream(clientId, messageStream.nextStreamOptions());
}
@@ -136,6 +144,23 @@
}
@Test(dependsOnMethods = { "streamManyPagesOfMessages" })
+ public void listMessagesByIds() throws Exception {
+ for (String zoneId : api.getConfiguredZones()) {
+ MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test");
+ UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+ List<Message> messages = messageApi.list(clientId, messageIds.get(zoneId));
+
+ assertEquals(messages.size(), 4);
+
+ for (Message message: messages) {
+ assertNotNull(message.getId());
+ assertNotNull(message.getBody());
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = { "listMessagesByIds" })
public void delete() throws Exception {
for (String zoneId : api.getConfiguredZones()) {
QueueApi queueApi = api.getQueueApiForZone(zoneId);
diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
index 8f45aa3..8adb244 100644
--- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
+++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
@@ -22,6 +22,7 @@
import com.squareup.okhttp.mockwebserver.MockWebServer;
import org.jclouds.openstack.marconi.v1.MarconiApi;
import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.Message;
import org.jclouds.openstack.marconi.v1.domain.MessageStream;
import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest;
@@ -30,7 +31,7 @@
import java.util.List;
import java.util.UUID;
-import static org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.limit;
+import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.limit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -195,4 +196,35 @@
server.shutdown();
}
}
+
+ public void listMessagesByIds() throws Exception {
+ MockWebServer server = mockOpenStackServer();
+ server.enqueue(new MockResponse().setBody(accessRackspace));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody("[{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1596, \"href\": \"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883227\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1596, \"href\": \"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883228\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1596, \"href\": \"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883229\", \"ttl\": 86400}]"));
+
+ try {
+ MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi");
+ MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test");
+ UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+ List<String> ids = ImmutableList.of("52928896b04a584f24883227", "52928896b04a584f24883228", "52928896b04a584f24883229");
+
+ List<Message> messages = messageApi.list(clientId, ids);
+
+ assertEquals(messages.size(), 3);
+
+ for (Message message: messages) {
+ assertNotNull(message.getId());
+ assertNotNull(message.getBody());
+ assertEquals(message.getAge(), 1596);
+ assertEquals(message.getTTL(), 86400);
+ }
+
+ assertEquals(server.getRequestCount(), 2);
+ assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1");
+ assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?ids=52928896b04a584f24883227,52928896b04a584f24883228,52928896b04a584f24883229 HTTP/1.1");
+ }
+ finally {
+ server.shutdown();
+ }
+ }
}