blob: 75f80bf1c54b2d2865bc8c87002e8186e5313222 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io.sinks;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
@Slf4j
public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer> {
private RestHighLevelClient elasticClient;
private boolean schemaEnable;
private final Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema;
@Data
@AllArgsConstructor
public static final class SimplePojo {
private String field1;
private String field2;
}
/**
* This method is used to pre create the subscription for the Sink.
* @return the schema for the subscription
*/
public Schema<?> getInputTopicSchema() {
if (schemaEnable) {
// we do not want to enforce a Schema
// at the beginning of the test
return Schema.AUTO_CONSUME();
} else {
return Schema.STRING;
}
}
public ElasticSearchSinkTester(boolean schemaEnable) {
super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH);
sinkConfig.put("elasticSearchUrl", "http://" + ElasticSearchContainer.NAME + ":9200");
sinkConfig.put("indexName", "test-index");
this.schemaEnable = schemaEnable;
if (schemaEnable) {
sinkConfig.put("schemaEnable", "true");
kvSchema = Schema.KeyValue(Schema.JSON(SimplePojo.class),
Schema.AVRO(SimplePojo.class), KeyValueEncodingType.SEPARATED);
} else {
// default behaviour, it must be enabled the default, in order to preserve compatibility with Pulsar 2.8.x
kvSchema = null;
}
}
@Override
protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
return new ElasticSearchContainer(cluster.getClusterName());
}
@Override
public void prepareSink() throws Exception {
RestClientBuilder builder = RestClient.builder(
new HttpHost(
"localhost",
serviceContainer.getMappedPort(9200),
"http"));
elasticClient = new RestHighLevelClient(builder);
}
@Override
public void stopServiceContainer(PulsarCluster cluster) {
try {
if (elasticClient != null) {
elasticClient.close();
}
} catch (IOException e) {
log.warn("Error closing elasticClient, ignoring", e);
} finally {
super.stopServiceContainer(cluster);
}
}
@Override
public void validateSinkResult(Map<String, String> kvs) {
SearchRequest searchRequest = new SearchRequest("test-index");
Awaitility.await().untilAsserted(() -> {
SearchResponse searchResult = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
assertTrue(searchResult.getHits().getTotalHits().value > 0, searchResult.toString());
});
}
@Override
public void produceMessage(int numMessages, PulsarClient client,
String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
if (schemaEnable) {
@Cleanup
Producer<KeyValue<SimplePojo, SimplePojo>> producer = client.newProducer(kvSchema)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
kvs.put(key, key);
producer.newMessage()
.value(new KeyValue<>(new SimplePojo("f1_" + i, "f2_" + i),
new SimplePojo("v1_" + i, "v2_" + i)))
.send();
}
} else {
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
// this is a JSON document, written to ElasticSearch
Map<String, String> valueMap = new HashMap<>();
valueMap.put("key" + i, "value" + i);
String value = ObjectMapperFactory.getThreadLocal().writeValueAsString(valueMap);
kvs.put(key, value);
producer.newMessage()
.key(key)
.value(value)
.send();
}
}
}
}