blob: ca57cd06ce64dee2f9310f5ef12a2f70dddd1259 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jwplayer.sqe;
import com.jwplayer.sqe.trident.StreamMetadata;
import com.jwplayer.sqe.util.KafkaTestServer;
import kafka.message.MessageAndOffset;
import org.apache.commons.io.IOUtils;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.PrintWriter;
import java.util.List;
public class KafkaReplaysIntegrationTest {
private final String SOURCE_TOPIC = "source";
private final String DEST_TOPIC = "dest";
private KafkaTestServer kafkaServer;
private List<MessageAndOffset> destMessages;
private List<MessageAndOffset> sourceMessages;
@Before
public void setup() throws Exception {
kafkaServer = new KafkaTestServer();
// Populate the source topic
// Partition 0
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 0).toBytes(), new StreamMetadata(0L, 0, 0).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 1).toBytes(), new StreamMetadata(0L, 0, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 2).toBytes(), new StreamMetadata(0L, 0, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 1).toBytes(), new StreamMetadata(0L, 0, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 2).toBytes(), new StreamMetadata(0L, 0, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 3).toBytes(), new StreamMetadata(0L, 0, 3).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 2).toBytes(), new StreamMetadata(0L, 0, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 3).toBytes(), new StreamMetadata(0L, 0, 3).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 0, 4).toBytes(), new StreamMetadata(0L, 0, 4).toBytes());
// Partition 1
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 0).toBytes(), new StreamMetadata(0L, 1, 0).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 1).toBytes(), new StreamMetadata(0L, 1, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 2).toBytes(), new StreamMetadata(0L, 1, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 1).toBytes(), new StreamMetadata(0L, 1, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 0).toBytes(), new StreamMetadata(0L, 1, 0).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 1).toBytes(), new StreamMetadata(0L, 1, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 2).toBytes(), new StreamMetadata(0L, 1, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 1).toBytes(), new StreamMetadata(0L, 1, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 1, 0).toBytes(), new StreamMetadata(0L, 1, 0).toBytes());
// Partition 2
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 0).toBytes(), new StreamMetadata(0L, 2, 0).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 1).toBytes(), new StreamMetadata(0L, 2, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 2).toBytes(), new StreamMetadata(0L, 2, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 3).toBytes(), new StreamMetadata(0L, 2, 3).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 4).toBytes(), new StreamMetadata(0L, 2, 4).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 3).toBytes(), new StreamMetadata(0L, 2, 3).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 2).toBytes(), new StreamMetadata(0L, 2, 2).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 1).toBytes(), new StreamMetadata(0L, 2, 1).toBytes());
kafkaServer.sendMessage(SOURCE_TOPIC, 0, new StreamMetadata(0L, 2, 0).toBytes(), new StreamMetadata(0L, 2, 0).toBytes());
sourceMessages = kafkaServer.readMessages(SOURCE_TOPIC);
// Create command file
String commands = IOUtils.toString(this.getClass().getResourceAsStream("/queries/kafka-replays.json"));
commands = String.format(commands, kafkaServer.getZkServer().getConnectionString(), kafkaServer.getConnectionString());
File commandsFile = File.createTempFile("kafka-replays", "json");
commandsFile.deleteOnExit();
PrintWriter writer = new PrintWriter(commandsFile);
writer.println(commands);
writer.close();
// Run test topologies
String[] args = new String[9];
args[0] = "--commands";
args[1] = commandsFile.toURI().toString();
args[2] = "--config";
args[3] = "./conf/sample-conf.yaml";
args[4] = "--local";
args[5] = "--name";
args[6] = "KafkaReplays";
args[7] = "--runtime";
args[8] = "120";
Topology.main(args);
destMessages = kafkaServer.readMessages(DEST_TOPIC);
kafkaServer.shutdown();
}
@Test
public void destMessagesTest() {
// We expect 13 of 27 messages to make it to the destination topic
Assert.assertEquals(destMessages.size(), 13);
// We don't know the pid, but we can pull it from the first message. It should be consistent
// across all messages. We *do* know the expected ordering of the offsets from the source topic
// and the partition.
StreamMetadata metadata = StreamMetadata.parseBytes(Utils.toByteArray(destMessages.get(0).message().key()));
long pid = metadata.pid;
// Test each message
testMessage(destMessages.get(0), new StreamMetadata(pid, 0, 0).toBytes(), new StreamMetadata(0L, 0, 0).toBytes());
testMessage(destMessages.get(1), new StreamMetadata(pid, 0, 1).toBytes(), new StreamMetadata(0L, 0, 1).toBytes());
testMessage(destMessages.get(2), new StreamMetadata(pid, 0, 2).toBytes(), new StreamMetadata(0L, 0, 2).toBytes());
testMessage(destMessages.get(3), new StreamMetadata(pid, 0, 5).toBytes(), new StreamMetadata(0L, 0, 3).toBytes());
testMessage(destMessages.get(4), new StreamMetadata(pid, 0, 8).toBytes(), new StreamMetadata(0L, 0, 4).toBytes());
testMessage(destMessages.get(5), new StreamMetadata(pid, 0, 9).toBytes(), new StreamMetadata(0L, 1, 0).toBytes());
testMessage(destMessages.get(6), new StreamMetadata(pid, 0, 10).toBytes(), new StreamMetadata(0L, 1, 1).toBytes());
testMessage(destMessages.get(7), new StreamMetadata(pid, 0, 11).toBytes(), new StreamMetadata(0L, 1, 2).toBytes());
testMessage(destMessages.get(8), new StreamMetadata(pid, 0, 18).toBytes(), new StreamMetadata(0L, 2, 0).toBytes());
testMessage(destMessages.get(9), new StreamMetadata(pid, 0, 19).toBytes(), new StreamMetadata(0L, 2, 1).toBytes());
testMessage(destMessages.get(10), new StreamMetadata(pid, 0, 20).toBytes(), new StreamMetadata(0L, 2, 2).toBytes());
testMessage(destMessages.get(11), new StreamMetadata(pid, 0, 21).toBytes(), new StreamMetadata(0L, 2, 3).toBytes());
testMessage(destMessages.get(12), new StreamMetadata(pid, 0, 22).toBytes(), new StreamMetadata(0L, 2, 4).toBytes());
}
@Test
public void soureMessagesTest() {
Assert.assertEquals(sourceMessages.size(), 27);
}
public void testMessage(MessageAndOffset messageAndOffset, byte[] expectedKey, byte[] expectedValue) {
byte[] key = Utils.toByteArray(messageAndOffset.message().key());
byte[] value = Utils.toByteArray(messageAndOffset.message().payload());
Assert.assertArrayEquals(expectedKey, key);
Assert.assertArrayEquals(expectedValue, value);
}
}