blob: 32e3bc69ea6aeebe210083323de5e770d4ee435b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelSinkTaskTest {
@Test
public void testOnlyBody() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
camelSinkTask.stop();
}
@Test
public void testBodyAndHeaders() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
record.headers().addShort("CamelHeaderMyShort", myShort);
record.headers().addDouble("CamelHeaderMyDouble", myDouble);
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
camelSinkTask.stop();
}
@Test
public void testBodyAndProperties() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
record.headers().addShort("CamelPropertyMyShort", myShort);
record.headers().addDouble("CamelPropertyMyDouble", myDouble);
record.headers().addInt("CamelPropertyMyInteger", myInteger);
record.headers().addLong("CamelPropertyMyLong", myLong);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
camelSinkTask.stop();
}
@Test
public void testBodyAndPropertiesHeadersMixed() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
record.headers().addShort("CamelPropertyMyShort", myShort);
record.headers().addDouble("CamelPropertyMyDouble", myDouble);
record.headers().addInt("CamelPropertyMyInteger", myInteger);
record.headers().addLong("CamelPropertyMyLong", myLong);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
record.headers().addShort("CamelHeaderMyShort", myShort);
record.headers().addDouble("CamelHeaderMyDouble", myDouble);
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
camelSinkTask.stop();
}
@Test
public void testBodyAndHeadersMap() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
HashMap<String, String> map = new HashMap<String, String>();
map.put("a", "a");
HashMap<Integer, String> map1 = new HashMap<Integer, String>();
map1.put(1, "a");
HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
map2.put(1, 1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
record.headers().addShort("CamelHeaderMyShort", myShort);
record.headers().addDouble("CamelHeaderMyDouble", myDouble);
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
camelSinkTask.stop();
}
@Test
public void testBodyAndPropertiesHeadersMapMixed() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
HashMap<String, String> map = new HashMap<String, String>();
map.put("a", "a");
HashMap<Integer, String> map1 = new HashMap<Integer, String>();
map1.put(1, "a");
HashMap<Integer, Integer> map2 = new HashMap<Integer, Integer>();
map2.put(1, 1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
record.headers().addShort("CamelPropertyMyShort", myShort);
record.headers().addDouble("CamelPropertyMyDouble", myDouble);
record.headers().addInt("CamelPropertyMyInteger", myInteger);
record.headers().addLong("CamelPropertyMyLong", myLong);
record.headers().addMap("CamelPropertyMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelPropertyMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelPropertyMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
record.headers().addShort("CamelHeaderMyShort", myShort);
record.headers().addDouble("CamelHeaderMyDouble", myDouble);
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
record.headers().addMap("CamelHeaderMyMap", map, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelHeaderMyMap1", map1, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelHeaderMyMap2", map2, SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
assertEquals(map, exchange.getProperties().get("CamelPropertyMyMap"));
assertEquals(map1, exchange.getProperties().get("CamelPropertyMyMap1"));
assertEquals(map2, exchange.getProperties().get("CamelPropertyMyMap2"));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
assertEquals(map, exchange.getIn().getHeader("CamelHeaderMyMap", Map.class));
assertEquals(map1, exchange.getIn().getHeader("CamelHeaderMyMap1", Map.class));
assertEquals(map2, exchange.getIn().getHeader("CamelHeaderMyMap2", Map.class));
camelSinkTask.stop();
}
@Test
public void testBodyAndHeadersList() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
List<String> list = new ArrayList<String>();
list.add("a");
List<Integer> list1 = new ArrayList<Integer>();
list1.add(1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
record.headers().addShort("CamelHeaderMyShort", myShort);
record.headers().addDouble("CamelHeaderMyDouble", myDouble);
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
camelSinkTask.stop();
}
@Test
public void testBodyAndPropertiesHeadersListMixed() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
Double myDouble = new Double("100");
int myInteger = 100;
Long myLong = new Long("100");
List<String> list = new ArrayList<String>();
list.add("a");
List<Integer> list1 = new ArrayList<Integer>();
list1.add(1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
record.headers().addShort("CamelPropertyMyShort", myShort);
record.headers().addDouble("CamelPropertyMyDouble", myDouble);
record.headers().addInt("CamelPropertyMyInteger", myInteger);
record.headers().addLong("CamelPropertyMyLong", myLong);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
record.headers().addShort("CamelHeaderMyShort", myShort);
record.headers().addDouble("CamelHeaderMyDouble", myDouble);
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
record.headers().addList("CamelHeaderMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
record.headers().addList("CamelHeaderMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
record.headers().addList("CamelPropertyMyList", list, SchemaBuilder.array(Schema.STRING_SCHEMA));
record.headers().addList("CamelPropertyMyList1", list1, SchemaBuilder.array(Schema.INT64_SCHEMA));
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean) exchange.getProperties().get("CamelPropertyMyBoolean"));
assertEquals(myByte, (Byte) exchange.getProperties().get("CamelPropertyMyByte"));
assertEquals(myFloat, (Float) exchange.getProperties().get("CamelPropertyMyFloat"));
assertEquals(myShort, (Short) exchange.getProperties().get("CamelPropertyMyShort"));
assertEquals(myDouble, (Double) exchange.getProperties().get("CamelPropertyMyDouble"));
assertEquals(myInteger, exchange.getProperties().get("CamelPropertyMyInteger"));
assertEquals(myLong, (Long) exchange.getProperties().get("CamelPropertyMyLong"));
assertEquals(list, exchange.getProperties().get("CamelPropertyMyList"));
assertEquals(list1, exchange.getProperties().get("CamelPropertyMyList1"));
assertTrue(exchange.getIn().getHeader("CamelHeaderMyBoolean", Boolean.class));
assertEquals(myByte, exchange.getIn().getHeader("CamelHeaderMyByte", Byte.class));
assertEquals(myFloat, exchange.getIn().getHeader("CamelHeaderMyFloat", Float.class));
assertEquals(myShort, exchange.getIn().getHeader("CamelHeaderMyShort", Short.class));
assertEquals(myDouble, exchange.getIn().getHeader("CamelHeaderMyDouble", Double.class));
assertEquals(myInteger, exchange.getIn().getHeader("CamelHeaderMyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("CamelHeaderMyLong", Long.class));
assertEquals(list, exchange.getIn().getHeader("CamelHeaderMyList", List.class));
assertEquals(list1, exchange.getIn().getHeader("CamelHeaderMyList1", List.class));
camelSinkTask.stop();
}
@Test
public void testUrlPrecedenceOnComponentProperty() {
Map<String, String> props = new HashMap<>();
props.put("camel.sink.url", "seda:test");
props.put("topics", "mytopic");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "shouldNotBeUsed");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
camelSinkTask.stop();
}
@Test
public void testOnlyBodyUsingComponentProperty() {
Map<String, String> props = new HashMap<>();
props.put("topics", "mytopic");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(1, camelSinkTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
camelSinkTask.stop();
}
@Test
public void testOnlyBodyUsingMultipleComponentProperties() {
Map<String, String> props = new HashMap<>();
props.put("topics", "mytopic");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
String topic = "mytopic";
List<SinkRecord> records = new ArrayList<SinkRecord>();
SinkRecord record = new SinkRecord(topic, 1, null, "test", null, "camel", 42);
records.add(record);
camelSinkTask.put(records);
ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
Exchange exchange = c.receive("seda:test", 1000L);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertEquals(1, camelSinkTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
camelSinkTask.stop();
}
}