blob: 4954507d58d428ccaa2960f9692384cac26ea2d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.management;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.stellar.common.StellarProcessor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.DefaultVariableResolver;
import org.apache.metron.stellar.dsl.functions.MapFunctions;
import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.junit.jupiter.api.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
/**
* Tests the KafkaFunctions class.
*
* Labelled as an integration test as the tests stand-up a Kafka Broker for the
* Stellar Kafka functions to interact with.
*/
public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
private static final String message1 = "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 14687 }";
private static final String message2 = "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 23 }";
private static final String message3 = "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 29011 }";
private static Map<String, Object> variables;
private static ZKServerComponent zkServerComponent;
private static KafkaComponent kafkaComponent;
private static ComponentRunner runner;
private static Properties global;
private static FunctionResolver functionResolver;
private static ExecutorService executor;
@BeforeAll
public static void setupExecutor() {
executor = Executors.newFixedThreadPool(2);
}
@AfterAll
public static void tearDownExecutor() {
if(executor != null && !executor.isShutdown()) {
executor.shutdown();
}
}
@BeforeAll
public static void setupKafka() throws Exception {
Properties properties = new Properties();
zkServerComponent = getZKServerComponent(properties);
kafkaComponent = getKafkaComponent(properties, new ArrayList<>());
runner = new ComponentRunner.Builder()
.withComponent("zk", zkServerComponent)
.withComponent("kafka", kafkaComponent)
.withMillisecondsBetweenAttempts(5000)
.withNumRetries(5)
.withCustomShutdownOrder(new String[]{"kafka","zk"})
.build();
runner.start();
}
@BeforeAll
public static void setupFunctionResolver() {
// used when executing Stellar expressions
functionResolver = new SimpleFunctionResolver()
.withClass(KafkaFunctions.KafkaGet.class)
.withClass(KafkaFunctions.KafkaPut.class)
.withClass(KafkaFunctions.KafkaProps.class)
.withClass(KafkaFunctions.KafkaTail.class)
.withClass(KafkaFunctions.KafkaFind.class)
.withClass(KafkaFunctions.KafkaSeek.class)
.withClass(MapFunctions.MapGet.class);
}
@BeforeEach
public void setup() {
// messages that will be read/written during the tests
variables = new HashMap<>();
variables.put("message1", message1);
variables.put("message2", message2);
variables.put("message3", message3);
// global properties
global = new Properties();
global.put("bootstrap.servers", kafkaComponent.getBrokerList());
// start reading from the earliest offset, which is necessary for these tests
global.put("auto.offset.reset", "earliest");
}
@AfterAll
public static void tearDownAfterClass() {
runner.stop();
}
@AfterEach
public void tearDown() {
runner.reset();
}
/**
* KAFKA_PUT should be able to write one message to a topic.
* KAFKA_GET should be able to read one message from a topic.
*/
@Test
public void testKafkaPut(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put a message onto the topic
assertEquals(1, run("KAFKA_PUT(topic, [message1])"));
// validate the message in the topic
assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic)"));
}
/**
* KAFKA_PUT should be able to write multiple message to a topic.
*/
@Test
public void testKafkaPutMultipleMessages(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put a message onto the topic
assertEquals(2, run("KAFKA_PUT(topic, [message1, message2])"));
// validate the message in the topic
List<String> expected = new ArrayList<String>() {{
add(message1);
add(message2);
}};
assertEquals(expected, run("KAFKA_GET(topic, 2)"));
}
/**
* KAFKA_PUT should be able to write a message passed as a String, rather than a List.
*/
@Test
public void testKafkaPutOneMessagePassedAsString(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put a message onto the topic - the message is just a string, not a list
run("KAFKA_PUT(topic, message1)");
// get a message from the topic
Object actual = run("KAFKA_GET(topic)");
// validate
assertEquals(Collections.singletonList(message1), actual);
}
/**
* KAFKA_PUT should be able to write a message passed as a String, rather than a List.
*/
@Test
public void testKafkaPutWithRichView(TestInfo testInfo) {
// configure a detailed view of each message
global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put a message onto the topic - the message is just a string, not a list
Object actual = run("KAFKA_PUT(topic, message1)");
// validate
assertTrue(actual instanceof List);
List<Object> results = (List) actual;
assertEquals(1, results.size());
// expect a 'rich' view of the record
Map<String, Object> view = (Map) results.get(0);
assertEquals(topicName, view.get("topic"));
assertEquals(0, view.get("partition"));
assertEquals(0L, view.get("offset"));
assertNotNull(view.get("timestamp"));
}
/**
* KAFKA_GET should allow a user to see a detailed view of each Kafka record.
*/
@Test
public void testKafkaGetWithRichView(TestInfo testInfo) {
// configure a detailed view of each message
global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put a message onto the topic - the message is just a string, not a list
run("KAFKA_PUT(topic, message1)");
// get a message from the topic
Object actual = run("KAFKA_GET(topic)");
// validate
assertTrue(actual instanceof List);
List<Object> results = (List) actual;
assertEquals(1, results.size());
// expect a 'rich' view of the record
Map<String, Object> view = (Map) results.get(0);
assertNull(view.get("key"));
assertEquals(0L, view.get("offset"));
assertEquals(0, view.get("partition"));
assertEquals(topicName, view.get("topic"));
assertEquals(message1, view.get("value"));
}
/**
* KAFKA_PUT should be able to write multiple messages passed as a List.
* KAFKA_GET should be able to read multiple messages at once.
*/
@Test
public void testKafkaPutThenGetWithMultipleMessages(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put multiple messages onto the topic
run("KAFKA_PUT(topic, [message1, message2, message3])");
// get 3 messages from the topic
Object actual = run("KAFKA_GET(topic, 3)");
// validate that all 3 messages were read
List<String> expected = new ArrayList<String>() {{
add(message1);
add(message2);
add(message3);
}};
assertEquals(expected, actual);
}
/**
* KAFKA_GET should maintain its consumer offsets and reuse them across subsequent calls.
*
* Does the client maintain the consumer offset correctly?
*
* The offsets must be maintained correctly to read messages sequentially, in order
* across separate executions of KAKFA_GET
*/
@Test
public void testKafkaGetWithSequentialReads(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put multiple messages onto the topic
run("KAFKA_PUT(topic, [message1, message2, message3])");
// read the first message
assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic, 1)"));
// pick-up from where we left off and read the second message
assertEquals(Collections.singletonList(message2), run("KAFKA_GET(topic, 1)"));
// pick-up from where we left off and read the third message
assertEquals(Collections.singletonList(message3), run("KAFKA_GET(topic, 1)"));
// no more messages left to read
assertEquals(Collections.emptyList(), run("KAFKA_GET(topic, 1)"));
}
/**
* KAFKA_GET should return nothing if a topic does not exist
*/
@Test
public void testKafkaGetWithNonExistentTopic(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// no more messages left to read
assertEquals(Collections.emptyList(), run("KAFKA_GET(topic, 1)"));
}
/**
* KAFKA_TAIL should return new messages from the end of a topic.
*/
@Test
public void testKafkaTail(TestInfo testInfo) throws Exception {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put multiple messages onto the topic; KAFKA tail should NOT retrieve these
run("KAFKA_PUT(topic, [message2, message2, message2])");
// get a message from the topic; will block until messages arrive
Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)");
// put 10 messages onto the topic for KAFKA_TAIL to grab
runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
// expect to receive message1, which were added to the topic while KAFKA_TAIL was running
Object actual = tailFuture.get(10, TimeUnit.SECONDS);
List<String> expected = Collections.singletonList(message1);
assertEquals(expected, actual);
}
/**
* KAFKA_TAIL should always seek to end of the topic. If no messages arrives after the 'seek to end'
* then no messages will be returned.
*/
@Test
public void testKafkaTailNone(TestInfo testInfo) {
// shorten the max wait time so we do not have to wait so long
global.put(KafkaFunctions.MAX_WAIT_PROPERTY, 2000);
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put multiple messages onto the topic
run("KAFKA_PUT(topic, [message1, message2, message3])");
// no messages to read as KAFKA_TAIL should "seek to end" of the topic
assertEquals(Collections.emptyList(), run("KAFKA_TAIL(topic, 1)"));
}
/**
* KAFKA_TAIL should allow a user to see a rich view of each Kafka record.
*/
@Test
public void testKafkaTailWithRichView(TestInfo testInfo) throws Exception {
// configure a detailed view of each message
global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put multiple messages onto the topic; KAFKA tail should NOT retrieve these
run("KAFKA_PUT(topic, [message2, message2, message2])");
// get a message from the topic; will block until messages arrive
Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)");
// put 10 messages onto the topic for KAFKA_TAIL to grab
runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
// wait for KAFKA_TAIL to complete
Object actual = tailFuture.get(10, TimeUnit.SECONDS);
// validate
assertTrue(actual instanceof List);
List<Object> results = (List) actual;
assertEquals(1, results.size());
// expect a 'rich' view of the record
Map<String, Object> view = (Map) results.get(0);
assertNull(view.get("key"));
assertEquals(0, view.get("partition"));
assertEquals(topicName, view.get("topic"));
assertEquals(message1, view.get("value"));
assertNotNull(view.get("offset"));
}
/**
* KAFKA_PROPS should return the set of properties used to configure the Kafka consumer
*
* The properties used for the KAFKA_* functions are calculated by compiling the default, global and user
* properties into a single set of properties. The global properties should override any default properties.
*/
@Test
public void testKafkaPropsWithGlobalOverride() {
// setup - override a key in the global properties
final String overriddenKey = "bootstrap.servers";
final String expected = "foo.global.override.com:9092";
global.setProperty(overriddenKey, expected);
// validate - ensure the global overrides the default property value
Map<String, String> properties = (Map<String, String>) run("KAFKA_PROPS()");
assertEquals(expected, properties.get(overriddenKey));
}
/**
* KAFKA_PROPS should allow the global properties to be overridden
*
* The properties used for the KAFKA_* functions are calculated by compiling the default, global and user
* properties into a single set of properties. The user properties should override any default or global properties.
*/
@Test
public void testKafkaPropsWithUserOverride() {
// setup - override a key in the global properties
final String overriddenKey = "bootstrap.servers";
global.setProperty(overriddenKey, "foo.global.override.com:9092");
// setup - override the same key in the user properties
final String expected = "foo.user.override.com:9092";
String expression = String.format("KAFKA_PROPS({ '%s' : '%s' })", overriddenKey, expected);
// validate - ensure the user properties override the global and defaults
Map<String, String> properties = (Map<String, String>) run(expression);
assertEquals(expected, properties.get(overriddenKey));
}
/**
* KAFKA_FIND should only return messages that satisfy a filter expression.
*/
@Test
public void testKafkaFind(TestInfo testInfo) throws Exception {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// find all messages satisfying the filter expression
Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m) == 23)");
// put 10 messages onto the topic for KAFKA_TAIL to grab
runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
// only expect `message2` where value == 23 to be returned
Object actual = future.get(10, TimeUnit.SECONDS);
List<String> expected = Collections.singletonList(message2);
assertEquals(expected, actual);
}
/**
* KAFKA_FIND should return no messages, if none match the filter expression.
*/
@Test
public void testKafkaFindNone(TestInfo testInfo) throws Exception {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// find all messages satisfying the filter expression
Future<Object> future = runAsync("KAFKA_FIND(topic, m -> false)");
// put 10 messages onto the topic for KAFKA_TAIL to grab
runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
// no messages satisfy the filter expression
Object actual = future.get(10, TimeUnit.SECONDS);
List<String> expected = Collections.emptyList();
assertEquals(expected, actual);
}
/**
* KAFKA_FIND should allow a user to see a detailed view of each Kafka record.
*/
@Test
public void testKafkaFindWithRichView(TestInfo testInfo) throws Exception {
// configure a detailed view of each message
global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// find all messages satisfying the filter expression
Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m) == 23)");
// put 10 messages onto the topic for KAFKA_TAIL to grab
runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
// validate
Object actual = future.get(10, TimeUnit.SECONDS);
assertTrue(actual instanceof List);
List<Object> results = (List) actual;
assertEquals(1, results.size());
// expect a 'rich' view of the record
Map<String, Object> view = (Map) results.get(0);
assertNull(view.get("key"));
assertNotNull(view.get("offset"));
assertEquals(0, view.get("partition"));
assertEquals(topicName, view.get("topic"));
assertEquals(message2, view.get("value"));
}
/**
* KAFKA_FIND should return no more messages than its limit.
*/
@Test
public void testKafkaFindMultiple(TestInfo testInfo) throws Exception {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// find all messages satisfying the filter expression
Future<Object> future = runAsync("KAFKA_FIND(topic, m -> true, 2)");
// put 10 messages onto the topic for KAFKA_TAIL to grab
runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
// all messages should satisfy the filter
List<String> expected = new ArrayList<String>() {{
add(message2);
add(message2);
}};
Object actual = future.get(10, TimeUnit.SECONDS);
assertEquals(expected, actual);
}
/**
* KAFKA_FIND should wait no more than a maximum time before returning, even if no matching
* messages are found.
*/
@Test
public void testKafkaFindExceedsMaxWait(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// write all 3 messages to the topic
run("KAFKA_PUT(topic, [message1, message2, message3])");
// execute the test - none of the messages satisfy the filter
long before = System.currentTimeMillis();
Object actual = run("KAFKA_FIND(topic, m -> false, 10, { 'stellar.kafka.max.wait.millis': 1000 })");
// expect not to have waited more than roughly 1000 millis
long wait = System.currentTimeMillis() - before;
assertTrue(wait < 2 * 1000, "Expected wait not to exceed max wait; actual wait = " + wait);
// expect no messages
List<String> expected = Collections.emptyList();
assertEquals(expected, actual);
}
/**
* KAFKA_SEEK should return the message at a given offset.
*/
@Test
public void testKafkaSeek(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put 3 messages into the topic
run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
{
// get the 3rd message from the topic
Object actual = run("KAFKA_SEEK(topic, 0, 2)");
assertEquals(message3, actual);
}
{
// get the 2nd message from the topic
Object actual = run("KAFKA_SEEK(topic, 0, 1)");
assertEquals(message2, actual);
}
{
// get the 1st message from the topic
Object actual = run("KAFKA_SEEK(topic, 0, 0)");
assertEquals(message1, actual);
}
}
/**
* KAFKA_SEEK should return null if the offset does not exist
*/
@Test
public void testKafkaSeekToMissingOffset(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put 3 messages into the topic
run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
// get the 3rd message from the topic
Object actual = run("KAFKA_SEEK(topic, 0, 9999)");
assertNull(actual);
}
/**
* KAFKA_SEEK should return null if the partition does not exist
*/
@Test
public void testKafkaSeekToMissingPartition(TestInfo testInfo) {
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
// put 3 messages into the topic
run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
// get the 3rd message from the topic
Object actual = run("KAFKA_SEEK(topic, 99999, 0)");
assertNull(actual);
}
/**
* KAFKA_SEEK should allow a user to see a detailed view of each Kafka record.
*/
@Test
public void testKafkaSeekWithRichView(TestInfo testInfo) {
// configure a detailed view of each message
global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
// use a unique topic name for this test
final String topicName = testInfo.getTestMethod().get().getName();
variables.put("topic", topicName);
run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
Object actual = run("KAFKA_SEEK(topic, 0, 0)");
// expect a 'rich' view of the record
assertTrue(actual instanceof Map);
Map<String, Object> view = (Map) actual;
assertNull(view.get("key"));
assertNotNull(view.get("offset"));
assertEquals(0, view.get("partition"));
assertEquals(topicName, view.get("topic"));
assertEquals(message1, view.get("value"));
}
/**
* Runs a Stellar expression.
* @param expression The expression to run.
*/
private Object run(String expression) {
// make the global properties available to the function
Context context = new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
.build();
// execute the expression
StellarProcessor processor = new StellarProcessor();
return processor.parse(expression,
new DefaultVariableResolver(
x -> variables.get(x),
x -> variables.containsKey(x)),
functionResolver,
context);
}
/**
* Runs a Stellar expression asynchronously.
*
* <p>Does not block until the expression completes.
*
* @param expression The expression to run.
* @return The result of executing the expression.
*/
private Future<Object> runAsync(String expression) {
return executor.submit(() -> run(expression));
}
/**
* Runs a set of Stellar expression asynchronously and waits
* for each to complete before returning.
*
* @param expressions The expressions to complete.
* @throws Exception
*/
private void runAsyncAndWait(Iterable<String> expressions) throws Exception {
// put multiple messages onto the topic asynchronously for KAFKA_TAIL to grab
List<Future<Object>> putFutures = new ArrayList<>();
for(String expression: expressions) {
Future<Object> future = runAsync(expression);
putFutures.add(future);
}
// wait for the puts to complete
for(Future<Object> future: putFutures) {
future.get(5, TimeUnit.SECONDS);
}
}
}