blob: 8da21806bec0d75c2abb6fa241058b2bfb5c7b55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.qakka.core;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ProtocolVersion;
import com.google.inject.Injector;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
import org.apache.usergrid.persistence.qakka.AbstractAkkaTest;
import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
import org.apache.usergrid.persistence.qakka.serialization.Result;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.*;
import java.util.stream.Collectors;
@NotThreadSafe
public class QueueMessageManagerTest extends AbstractAkkaTest {
private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerTest.class );
// TODO: test that multiple threads pulling from same queue will never pop same item
public void shutdown(Injector injector){
injector.getInstance(DataStaxCluster.class).shutdown();
}
@Test
public void testBasicOperation() throws Exception {
String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
Injector injector = getInjector();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start( "localhost", getNextAkkaPort(), region );
// create queue and send one message to it
QueueManager queueManager = injector.getInstance( QueueManager.class );
try {
QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
String jsonData = "{}";
qmm.sendMessages( queueName, Collections.singletonList( region ), null, null,
"application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED ) );
distributedQueueService.refresh();
Thread.sleep( 1000 );
// get message from the queue
List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
Assert.assertEquals( 1, messages.size() );
QueueMessage message = messages.get( 0 );
// test that queue message data is present and correct
QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
Assert.assertNotNull( data );
Assert.assertEquals( "application/json", data.getContentType() );
String jsonDataReturned = new String( data.getBlob().array(), Charset.forName( "UTF-8" ) );
Assert.assertEquals( jsonData, jsonDataReturned );
// test that transfer log is empty for our queue
TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
List<TransferLog> logs = all.getEntities().stream()
.filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
Assert.assertTrue( logs.isEmpty() );
// ack the message
qmm.ackMessage( queueName, message.getQueueMessageId() );
// test that message is no longer stored in non-replicated keyspace
Assert.assertNull( qms.loadMessage( queueName, region, null,
DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) );
Assert.assertNull( qms.loadMessage( queueName, region, null,
DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) );
// test that audit log entry was written
AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
Assert.assertEquals( 3, auditLogs.getEntities().size() );
distributedQueueService.shutdown();
} finally {
queueManager.deleteQueue( queueName );
}
}
@Test
public void testQueueMessageTimeouts() throws Exception {
Injector injector = getInjector();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start( "localhost", getNextAkkaPort(), region );
// create some number of queue messages
QueueManager queueManager = injector.getInstance( QueueManager.class );
String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric( 15 );
try {
QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
int numMessages = 40;
for (int i = 0; i < numMessages; i++) {
qmm.sendMessages(
queueName,
Collections.singletonList( region ),
null, // delay
null, // expiration
"application/json",
DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
}
DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;
int maxRetries = 15;
int retries = 0;
while (retries++ < maxRetries) {
distributedQueueService.refresh();
if (qmm.getQueueDepth( queueName, type ) == numMessages) {
break;
}
Thread.sleep( 500 );
}
Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, type ) );
// get all messages from queue
List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
Assert.assertEquals( numMessages, messages.size() );
// ack half of the messages
List<QueueMessage> remove = new ArrayList<>();
for (int i = 0; i < numMessages / 2; i++) {
QueueMessage queueMessage = messages.get( i );
qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
remove.add( queueMessage );
}
for (QueueMessage message : remove) {
messages.remove( message );
}
// wait for twice timeout period
Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
distributedQueueService.processTimeouts();
// attempt to ack other half of messages
for (QueueMessage message : messages) {
try {
qmm.ackMessage( queueName, message.getQueueMessageId() );
Assert.fail( "Message should have timed out by now" );
} catch (QakkaRuntimeException expected) {
// keep on going...
}
}
distributedQueueService.shutdown();
} finally {
queueManager.deleteQueue( queueName );
}
}
@Test
public void testGetWithMissingData() throws InterruptedException {
Injector injector = getInjector();
injector.getInstance( App.class ); // init the INJECTOR
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
DistributedQueueService qas = injector.getInstance( DistributedQueueService.class );
QueueManager qm = injector.getInstance( QueueManager.class );
QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start( "localhost", getNextAkkaPort(), region );
// create queue messages, every other one with missing data
int numMessages = 100;
String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
qm.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
for ( int i=0; i<numMessages; i++ ) {
final UUID messageId = QakkaUtils.getTimeUuid();
if ( i % 2 == 0 ) { // every other it
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
qms.writeMessageData( messageId, messageBody );
}
UUID queueMessageId = QakkaUtils.getTimeUuid();
DatabaseQueueMessage message = new DatabaseQueueMessage(
messageId,
DatabaseQueueMessage.Type.DEFAULT,
queueName,
actorSystemFig.getRegionLocal(),
null,
System.currentTimeMillis(),
null,
queueMessageId);
qms.writeMessage( message );
}
qas.refresh();
Thread.sleep(1000);
int count = 0;
while ( count < numMessages / 2 ) {
List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
Assert.assertTrue( !messages.isEmpty() );
count += messages.size();
logger.debug("Got {} messages", ++count);
}
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
distributedQueueService.shutdown();
}
@Test
public void testClearQueue() throws Exception {
Injector injector = getInjector();
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start( "localhost", getNextAkkaPort(), region );
// create some number of queue messages
QueueManager queueManager = injector.getInstance( QueueManager.class );
String queueName = "queue_testClearQueue" + RandomStringUtils.randomAlphanumeric( 15 );
try {
QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
int numMessages = 40;
for (int i = 0; i < numMessages; i++) {
qmm.sendMessages(
queueName,
Collections.singletonList( region ),
null, // delay
null, // expiration
"application/json",
DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
}
DatabaseQueueMessage.Type available = DatabaseQueueMessage.Type.DEFAULT;
DatabaseQueueMessage.Type inflight = DatabaseQueueMessage.Type.INFLIGHT;
int maxRetries = 15;
int retries = 0;
while (retries++ < maxRetries) {
distributedQueueService.refresh();
if (qmm.getQueueDepth( queueName, available ) == numMessages) {
break;
}
Thread.sleep( 1000 );
}
Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, available ) );
Assert.assertEquals( 0, qmm.getQueueDepth( queueName, inflight ) );
// get half of the messages from the queue
List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages/2 );
Assert.assertEquals( numMessages/2, messages.size() );
Thread.sleep(3000);
// now half messages should be available and half in flight
Assert.assertEquals( numMessages/2, qmm.getQueueDepth( queueName, available ) );
Assert.assertEquals( numMessages/2, qmm.getQueueDepth( queueName, inflight ) );
qmm.clearMessages( queueName );
// counters should show zero in queue
Assert.assertEquals( 0, qmm.getQueueDepth( queueName, available ) );
Assert.assertEquals( 0, qmm.getQueueDepth( queueName, inflight ) );
// TODO: check that all shards are gone
CassandraClient cassandraClient = injector.getInstance( CassandraClient.class );
ShardIterator defaultShardIterator = new ShardIterator( cassandraClient,
queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT, Optional.empty() );
Assert.assertTrue( !defaultShardIterator.hasNext() );
ShardIterator inflightShardIterator = new ShardIterator( cassandraClient,
queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT, Optional.empty() );
Assert.assertTrue( !inflightShardIterator.hasNext() );
} finally {
queueManager.deleteQueue( queueName );
}
}
}