| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.usergrid.persistence.qakka.api; |
| |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.io.ByteStreams; |
| import com.google.inject.Injector; |
| import org.apache.commons.lang.RandomStringUtils; |
| import org.apache.usergrid.persistence.qakka.QakkaFig; |
| import org.apache.usergrid.persistence.qakka.URIStrategy; |
| import org.apache.usergrid.persistence.qakka.api.impl.GuiceStartupListener; |
| import org.apache.usergrid.persistence.qakka.core.QueueMessage; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.ws.rs.client.Entity; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URISyntaxException; |
| import java.util.*; |
| |
| import static org.junit.Assert.fail; |
| |
| |
| /** |
| * Tests for the Qakka REST API. |
| * |
| * FYI: There is also a complete set of Qakka tests in the Usergrid Queue modulea. |
| */ |
| public class QueueResourceTest extends AbstractRestTest { |
| private static final Logger logger = LoggerFactory.getLogger( QueueResourceTest.class ); |
| |
| static private final TypeReference<Map<String,Object>> jsonMapTypeRef |
| = new TypeReference<Map<String,Object>>() {}; |
| |
| @Test |
| public void testCreateQueue() throws URISyntaxException { |
| |
| // create a queue |
| |
| String queueName = "qrt_create_" + RandomStringUtils.randomAlphanumeric( 10 ); |
| Map<String, Object> queueMap = new HashMap<String, Object>() {{ |
| put("name", queueName); |
| }}; |
| Response response = target("queues").request() |
| .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); |
| |
| Assert.assertEquals( 201, response.getStatus() ); |
| URIStrategy uriStrategy = GuiceStartupListener.INJECTOR.getInstance( URIStrategy.class ); |
| Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) ); |
| |
| // get queue by name |
| |
| response = target("queues").path( queueName ).path( "config" ).request().get(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| ApiResponse apiResponse = response.readEntity( ApiResponse.class ); |
| Assert.assertNotNull( apiResponse.getQueues() ); |
| Assert.assertFalse( apiResponse.getQueues().isEmpty() ); |
| Assert.assertEquals( 1, apiResponse.getQueues().size() ); |
| Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() ); |
| |
| response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| |
| |
| @Test |
| public void testDeleteQueue() throws URISyntaxException { |
| |
| // create a queue |
| |
| String queueName = "qrt_delete_" + RandomStringUtils.randomAlphanumeric( 10 ); |
| Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }}; |
| Response response = target("queues").request() |
| .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); |
| |
| Assert.assertEquals( 201, response.getStatus() ); |
| URIStrategy uriStrategy = GuiceStartupListener.INJECTOR.getInstance( URIStrategy.class ); |
| Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) ); |
| |
| // delete queue without confirm = true, should fail with bad request |
| |
| response = target("queues").path( queueName ).request().delete(); |
| Assert.assertEquals( 400, response.getStatus() ); |
| |
| // delete queue with confirm = true |
| |
| response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| |
| // cannot get queue by name |
| |
| response = target("queues").path( queueName ).path( "config" ).request().get(); |
| Assert.assertEquals( 404, response.getStatus() ); |
| } |
| |
| |
| @Test |
| public void testSendMessageToBadQueue() throws URISyntaxException, JsonProcessingException, InterruptedException { |
| |
| String queueName = "bogus_queue_is_bogus"; |
| Map<String, Object> messageMap = new HashMap<String, Object>() {{ put("dummy_prop", "dummy_value"); }}; |
| ObjectMapper mapper = new ObjectMapper(); |
| String body = mapper.writeValueAsString( messageMap ); |
| |
| Response response = target("queues").path( queueName ).path( "messages" ) |
| .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE )); |
| |
| Assert.assertEquals( 404, response.getStatus() ); |
| } |
| |
| |
| @Test |
| public void testSendJsonMessagesAsJson() throws URISyntaxException, IOException, InterruptedException { |
| sendJsonMessages( true ); |
| } |
| |
| |
| @Test |
| public void testSendMessagesJsonAsOctetStream() throws URISyntaxException, IOException, InterruptedException { |
| sendJsonMessages( false ); |
| } |
| |
| |
| /** |
| * Send 100 JSON payload messages to queue. |
| * @param asJson True to send with content-type header 'application/json' |
| * False to send with content-type header 'application/octet stream' |
| */ |
| private void sendJsonMessages( boolean asJson ) throws URISyntaxException, IOException, InterruptedException { |
| |
| // create a queue |
| |
| String queueName = "qrt_json_" + RandomStringUtils.randomAlphanumeric( 10 ); |
| Map<String, Object> queueMap = new HashMap<String, Object>() {{ |
| put( "name", queueName ); |
| }}; |
| target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) ); |
| |
| try { |
| |
| // send some messages |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| |
| int numMessages = 100; |
| for (int i = 0; i < numMessages; i++) { |
| |
| final int number = i; |
| Map<String, Object> messageMap = new HashMap<String, Object>() {{ |
| put( "message", "this is message #" + number ); |
| put( "valid", true ); |
| }}; |
| String body = mapper.writeValueAsString( messageMap ); |
| |
| Response response; |
| if (asJson) { |
| response = target( "queues" ).path( queueName ).path( "messages" ) |
| .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) ); |
| } else { |
| response = target( "queues" ).path( queueName ).path( "messages" ) |
| .queryParam( "contentType", MediaType.APPLICATION_JSON ) |
| .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) ); |
| } |
| |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| |
| // get all messages, checking for dups |
| |
| checkJsonMessages( queueName, numMessages ); |
| |
| } finally { |
| Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| } |
| |
| |
| private Set<UUID> checkJsonMessages( String queueName, int numMessages ) throws IOException { |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| |
| Set<UUID> messageIds = new HashSet<>(); |
| for ( int j=0; j<numMessages; j++ ) { |
| |
| int retries = 0; |
| int maxRetries = 10; |
| ApiResponse apiResponse = null; |
| while ( retries++ < maxRetries ) { |
| Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); |
| apiResponse = response.readEntity( ApiResponse.class ); |
| if ( !apiResponse.getQueueMessages().isEmpty() ) { |
| break; |
| } |
| try { Thread.sleep(500); } catch (Exception ignored) {} |
| } |
| |
| Assert.assertNotNull( apiResponse ); |
| Assert.assertNotNull( apiResponse.getQueueMessages() ); |
| Assert.assertEquals( 1, apiResponse.getQueueMessages().size() ); |
| |
| QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next(); |
| Map<String, Object> payload = mapper.readValue( queueMessage.getData(), jsonMapTypeRef ); |
| |
| Assert.assertEquals( queueName, queueMessage.getQueueName() ); |
| Assert.assertNull( queueMessage.getHref() ); |
| Assert.assertEquals( true, payload.get("valid") ); |
| |
| if (messageIds.contains( queueMessage.getQueueMessageId() )) { |
| Assert.fail("Message fetched twice: " + queueMessage.getQueueMessageId() ); |
| } else { |
| messageIds.add( queueMessage.getQueueMessageId() ); |
| } |
| } |
| Assert.assertEquals( numMessages, messageIds.size() ); |
| |
| return messageIds; |
| } |
| |
| |
| @Test |
| public void testSendBinaryMessages() throws URISyntaxException, IOException, InterruptedException { |
| |
| // create a queue |
| |
| String queueName = "qrt_binarty_" + RandomStringUtils.randomAlphanumeric( 10 ); |
| Map<String, Object> queueMap = new HashMap<String, Object>() {{ |
| put( "name", queueName ); |
| }}; |
| target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) ); |
| |
| try { |
| |
| // send messages each with image/jpg payload |
| |
| InputStream is = getClass().getResourceAsStream( "/qakka-duck.jpg" ); |
| byte[] bytes = ByteStreams.toByteArray( is ); |
| |
| int numMessages = 100; |
| for (int i = 0; i < numMessages; i++) { |
| |
| Response response = target( "queues" ).path( queueName ).path( "messages" ) |
| .queryParam( "contentType", "image/jpg" ) |
| .request() |
| .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ) ); |
| |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| |
| // get all messages, checking for dups |
| |
| checkBinaryMessages( queueName, numMessages ); |
| |
| } finally { |
| Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| } |
| |
| |
| private Set<UUID> checkBinaryMessages( String queueName, int numMessages ) throws IOException { |
| |
| Set<UUID> messageIds = new HashSet<>(); |
| for ( int j=0; j<numMessages; j++ ) { |
| |
| Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); |
| |
| ApiResponse apiResponse = response.readEntity( ApiResponse.class ); |
| Assert.assertNotNull( apiResponse.getQueueMessages() ); |
| Assert.assertFalse( apiResponse.getQueueMessages().isEmpty() ); |
| Assert.assertEquals( 1, apiResponse.getQueueMessages().size() ); |
| |
| QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next(); |
| |
| // no data in a binary message |
| Assert.assertNull( queueMessage.getData() ); |
| |
| // data can be found at HREF provided |
| Assert.assertNotNull( queueMessage.getHref() ); |
| |
| Response binaryResponse = target("queues") |
| .path( queueName ).path("data").path( queueMessage.getQueueMessageId().toString() ) |
| .request().accept( "image/jpg" ).get(); |
| |
| Assert.assertEquals( 200, binaryResponse.getStatus() ); |
| InputStream is = binaryResponse.readEntity( InputStream.class ); |
| |
| byte[] imageBytes = ByteStreams.toByteArray( is ); |
| Assert.assertEquals( 11188, imageBytes.length); |
| |
| if (messageIds.contains( queueMessage.getQueueMessageId() )) { |
| fail("Message fetched twice: " + queueMessage.getQueueMessageId() ); |
| } else { |
| messageIds.add( queueMessage.getQueueMessageId() ); |
| } |
| } |
| Assert.assertEquals( numMessages, messageIds.size() ); |
| |
| return messageIds; |
| } |
| |
| |
| @Test |
| public void testSendMessageAckAndTimeout() throws URISyntaxException, IOException, InterruptedException { |
| |
| // create a queue |
| |
| String queueName = "qrt_timeout_" + RandomStringUtils.randomAlphanumeric( 10 ); |
| Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }}; |
| target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); |
| |
| try { |
| |
| // send some messages |
| |
| ObjectMapper mapper = new ObjectMapper(); |
| |
| int numMessages = 100; |
| for (int i = 0; i < numMessages; i++) { |
| |
| final int number = i; |
| Map<String, Object> messageMap = new HashMap<String, Object>() {{ |
| put( "message", "this is message #" + number ); |
| put( "valid", true ); |
| }}; |
| String body = mapper.writeValueAsString( messageMap ); |
| |
| Response response = target( "queues" ).path( queueName ).path( "messages" ) |
| .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) ); |
| |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| |
| // get all messages, checking for dups |
| |
| Set<UUID> messageIds = checkJsonMessages( queueName, numMessages ); |
| |
| // there should be no more messages available |
| |
| Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); |
| ApiResponse apiResponse = response.readEntity( ApiResponse.class ); |
| Assert.assertNotNull( apiResponse.getQueueMessages() ); |
| Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() ); |
| |
| // ack half of the messages |
| |
| int count = 0; |
| Set<UUID> ackedIds = new HashSet<>(); |
| for (UUID queueMessageId : messageIds) { |
| response = target( "queues" ) |
| .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| ackedIds.add( queueMessageId ); |
| if (++count >= numMessages / 2) { |
| break; |
| } |
| } |
| messageIds.removeAll( ackedIds ); |
| |
| // wait for remaining of the messages to timeout |
| |
| QakkaFig qakkaFig = GuiceStartupListener.INJECTOR.getInstance( QakkaFig.class ); |
| Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 ); |
| |
| // now, the remaining messages cannot be acked because they timed out |
| |
| for (UUID queueMessageId : messageIds) { |
| response = target( "queues" ) |
| .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete(); |
| Assert.assertEquals( 400, response.getStatus() ); |
| } |
| |
| // and, those same messages should be available again in the queue |
| |
| checkJsonMessages( queueName, numMessages / 2 ); |
| |
| } finally { |
| Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); |
| Assert.assertEquals( 200, response.getStatus() ); |
| } |
| } |
| |
| |
| @Test |
| public void testConvertDelayParameter() { |
| |
| Injector injector = GuiceStartupListener.INJECTOR; |
| QueueResource queueResource = injector.getInstance( QueueResource.class ); |
| |
| Assert.assertEquals( 0L, queueResource.convertDelayParameter( "" ).longValue() ); |
| Assert.assertEquals( 0L, queueResource.convertDelayParameter( "0" ).longValue() ); |
| Assert.assertEquals( 0L, queueResource.convertDelayParameter( "NONE" ).longValue() ); |
| Assert.assertEquals( 5L, queueResource.convertDelayParameter( "5" ).longValue() ); |
| |
| try { |
| queueResource.convertDelayParameter( "bogus value" ); |
| fail("Expected exception on bad value"); |
| } catch ( IllegalArgumentException expected ) { |
| // pass |
| } |
| } |
| |
| @Test |
| public void testConvertExpirationParameter() { |
| |
| Injector injector = GuiceStartupListener.INJECTOR; |
| QueueResource queueResource = injector.getInstance( QueueResource.class ); |
| |
| Assert.assertNull( queueResource.convertExpirationParameter( "" ) ); |
| Assert.assertNull( queueResource.convertExpirationParameter( "NEVER" ) ); |
| |
| Assert.assertEquals( 5L, queueResource.convertExpirationParameter( "5" ).longValue() ); |
| |
| try { |
| queueResource.convertExpirationParameter( "bogus value" ); |
| fail("Expected exception on bad value"); |
| } catch ( IllegalArgumentException expected ) { |
| // pass |
| } |
| } |
| |
| } |
| |