blob: a619cb43426a63c011567c70de879e85efb41bc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.elasticsearch;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.DecimalType;
import org.apache.flink.table.api.types.TypeConverters;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.util.TableSchemaUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
/**
* Version-agnostic test base for {@link ElasticsearchUpsertTableSinkFactoryBase}.
*/
public abstract class ElasticsearchUpsertTableSinkFactoryTestBase extends TestLogger {
protected static final String HOSTNAME = "host1";
protected static final int PORT = 1234;
protected static final String SCHEMA = "https";
protected static final String INDEX = "MyIndex";
protected static final String DOC_TYPE = "MyType";
protected static final String KEY_DELIMITER = "#";
protected static final String KEY_NULL_LITERAL = "";
private static final String FIELD_KEY = "key";
private static final String FIELD_FRUIT_NAME = "fruit_name";
private static final String FIELD_COUNT = "count";
private static final String FIELD_TS = "ts";
@Test
public void testTableSink() {
// prepare parameters for Elasticsearch table sink
final TableSchema schema = createTestSchema();
final ElasticsearchUpsertTableSinkBase expectedSink = getExpectedTableSink(
false,
schema,
Collections.singletonList(new Host(HOSTNAME, PORT, SCHEMA)),
INDEX,
DOC_TYPE,
KEY_DELIMITER,
KEY_NULL_LITERAL,
new JsonRowSerializationSchema((TypeInformation<Row>)
TypeConverters.createExternalTypeInfoFromDataType(TableSchemaUtil.toRowType(schema))),
XContentType.JSON,
new DummyFailureHandler(),
createTestSinkOptions());
// construct table sink using descriptors and table sink factory
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Elasticsearch()
.version(getElasticsearchVersion())
.host(HOSTNAME, PORT, SCHEMA)
.index(INDEX)
.documentType(DOC_TYPE)
.keyDelimiter(KEY_DELIMITER)
.keyNullLiteral(KEY_NULL_LITERAL)
.bulkFlushBackoffExponential()
.bulkFlushBackoffDelay(123L)
.bulkFlushBackoffMaxRetries(3)
.bulkFlushInterval(100L)
.bulkFlushMaxActions(1000)
.bulkFlushMaxSize("1 MB")
.failureHandlerCustom(DummyFailureHandler.class)
.connectionMaxRetryTimeout(100)
.connectionPathPrefix("/myapp"))
.withFormat(
new Json()
.deriveSchema())
.withSchema(
new Schema()
.field(FIELD_KEY, Types.LONG())
.field(FIELD_FRUIT_NAME, Types.STRING())
.field(FIELD_COUNT, Types.DECIMAL())
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertEquals(expectedSink, actualSink);
}
protected TableSchema createTestSchema() {
return TableSchema.builder()
.field(FIELD_KEY, DataTypes.LONG)
.field(FIELD_FRUIT_NAME, DataTypes.STRING)
.field(FIELD_COUNT, DecimalType.SYSTEM_DEFAULT)
.field(FIELD_TS, DataTypes.TIMESTAMP)
.build();
}
protected Map<SinkOption, String> createTestSinkOptions() {
final Map<SinkOption, String> sinkOptions = new HashMap<>();
sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_ENABLED, "true");
sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_DELAY, "123");
sinkOptions.put(SinkOption.BULK_FLUSH_BACKOFF_RETRIES, "3");
sinkOptions.put(SinkOption.BULK_FLUSH_INTERVAL, "100");
sinkOptions.put(SinkOption.BULK_FLUSH_MAX_ACTIONS, "1000");
sinkOptions.put(SinkOption.BULK_FLUSH_MAX_SIZE, "1048576 bytes");
sinkOptions.put(SinkOption.REST_MAX_RETRY_TIMEOUT, "100");
sinkOptions.put(SinkOption.REST_PATH_PREFIX, "/myapp");
return sinkOptions;
}
// --------------------------------------------------------------------------------------------
// For version-specific tests
// --------------------------------------------------------------------------------------------
protected abstract String getElasticsearchVersion();
protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(
boolean isAppendOnly,
TableSchema schema,
List<Host> hosts,
String index,
String docType,
String keyDelimiter,
String keyNullLiteral,
SerializationSchema<Row> serializationSchema,
XContentType contentType,
ActionRequestFailureHandler failureHandler,
Map<SinkOption, String> sinkOptions);
// --------------------------------------------------------------------------------------------
// Helper classes
// --------------------------------------------------------------------------------------------
/**
* Custom failure handler for testing.
*/
public static class DummyFailureHandler implements ActionRequestFailureHandler {
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
// do nothing
}
@Override
public boolean equals(Object o) {
return this == o || o instanceof DummyFailureHandler;
}
@Override
public int hashCode() {
return DummyFailureHandler.class.hashCode();
}
}
}