blob: 5e9df6efc229ca5db673f3b0724ae3678ffeccdb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.s4.comm.util;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.name.Named;
/**
* Test util for communication protocols.
*
* <ul>
* <li>The util defines Send and Receive Threads</li>
* <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
* <li>ReceiveThread receives all/most of these messages</li>
* <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would interrupt after a pre-defined but
* long enough interval</li>
* </ul>
*
*/
public class PartitionInfo {
private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
public Emitter emitter;
public Listener listener;
public SendThread sendThread;
public ReceiveThread receiveThread;
private final int numRetries;
private final int retryDelayMs;
private int numMessages;
private int partitionId;
private ProtocolTestUtil ptu;
@Inject
SerializerDeserializer serDeser;
@Inject
public PartitionInfo(Emitter emitter, Listener listener, @Named("comm.retries") int retries,
@Named("comm.retry_delay") int retryDelay, @Named("emitter.send.numMessages") int numMessages) {
this.emitter = emitter;
this.listener = listener;
this.partitionId = this.listener.getPartitionId();
logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
this.listener.getPartitionId());
this.numRetries = retries;
this.retryDelayMs = retryDelay;
this.numMessages = numMessages;
// this.messagesExpected = numMessages * this.emitter.getPartitionCount();
this.sendThread = new SendThread();
this.receiveThread = new ReceiveThread();
}
public void setProtocolTestUtil(ProtocolTestUtil ptu) {
this.ptu = ptu;
this.ptu.expectedMessages[partitionId] = numMessages * this.emitter.getPartitionCount();
}
public class SendThread extends Thread {
public int[] sendCounts = new int[emitter.getPartitionCount()];
public SendThread() {
super("SendThread-" + partitionId);
}
@Override
public void run() {
try {
for (int i = 0; i < numMessages; i++) {
for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
EventMessage message = new EventMessage("app1", "stream1",
new String(partitionId + " " + i).getBytes());
for (int retries = 0; retries < numRetries; retries++) {
if (emitter.send(partition, message)) {
sendCounts[partition]++;
break;
}
Thread.sleep(retryDelayMs);
}
}
}
} catch (Exception e) {
e.printStackTrace();
return;
}
for (int i = 0; i < sendCounts.length; i++) {
if (sendCounts[i] < numMessages) {
ptu.decreaseExpectedMessages(i, (numMessages - sendCounts[i]));
}
}
logger.debug("Exiting");
}
}
public class ReceiveThread extends Thread {
protected int messagesReceived = 0;
private Hashtable<Integer, List<Integer>> receivedMessages;
ReceiveThread() {
super("ReceiveThread-" + partitionId);
receivedMessages = new Hashtable<Integer, List<Integer>>();
}
@Override
public void run() {
while (messagesReceived < ptu.expectedMessages[partitionId]) {
byte[] message = listener.recv();
if (message == null) {
logger.error("ReceiveThread {}: received a null message", partitionId);
break;
}
EventMessage deserialized = (EventMessage) serDeser.deserialize(message);
// process and store the message
String msgString = new String(deserialized.getSerializedEvent());
String[] msgTokens = msgString.split(" ");
Integer senderPartition = Integer.parseInt(msgTokens[0]);
Integer receivedMsg = Integer.parseInt(msgTokens[1]);
if (!receivedMessages.containsKey(senderPartition)) {
receivedMessages.put(senderPartition, new ArrayList<Integer>());
}
List<Integer> messagesList = receivedMessages.get(senderPartition);
if (messagesList.contains(receivedMsg)) {
messagesList.remove(receivedMsg);
} else {
messagesReceived++;
}
messagesList.add(receivedMsg);
}
logger.debug("Exiting");
}
public boolean orderedDelivery() {
for (List<Integer> messagesList : receivedMessages.values()) {
int lastMsg = -1;
for (Integer msg : messagesList) {
if (msg <= lastMsg) {
return false;
}
}
}
return true;
}
public boolean messageDelivery() {
if (messagesReceived < ptu.expectedMessages[partitionId]) {
printCounts();
return false;
} else
return true;
}
public void printCounts() {
logger.debug("ReceiveThread {}: Messages not received = {}", partitionId,
(ptu.expectedMessages[partitionId] - messagesReceived));
int counts[] = new int[emitter.getPartitionCount()];
for (Integer sender : receivedMessages.keySet()) {
counts[sender] = receivedMessages.get(sender).size();
}
logger.debug("ReceiveThread {}: recvdCounts: {}", partitionId, counts);
}
public int moreMessages() {
return (int) (ptu.expectedMessages[partitionId] - messagesReceived);
}
}
}