[FLINK-30388] Move Lazy Initialization of AWS element converters to SinkWriter open() method
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index b90db33..fb803ed 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -19,6 +19,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.MetricGroup;
@@ -39,7 +40,6 @@
@Internal
public class KinesisFirehoseSinkElementConverter<InputT>
implements ElementConverter<InputT, Record> {
- private boolean schemaOpened = false;
/** A serialization schema to specify how the input element should be serialized. */
private final SerializationSchema<InputT> serializationSchema;
@@ -50,32 +50,29 @@
@Override
public Record apply(InputT element, SinkWriter.Context context) {
- checkOpened();
return Record.builder()
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
.build();
}
- private void checkOpened() {
- if (!schemaOpened) {
- try {
- serializationSchema.open(
- new SerializationSchema.InitializationContext() {
- @Override
- public MetricGroup getMetricGroup() {
- return new UnregisteredMetricsGroup();
- }
+ @Override
+ public void open(Sink.InitContext context) {
+ try {
+ serializationSchema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return SimpleUserCodeClassLoader.create(
- KinesisFirehoseSinkElementConverter.class.getClassLoader());
- }
- });
- schemaOpened = true;
- } catch (Exception e) {
- throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
- }
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ KinesisFirehoseSinkElementConverter.class.getClassLoader());
+ }
+ });
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
}
}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
index ccb4582..4233668 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.firehose.sink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -44,6 +45,7 @@
KinesisFirehoseSinkElementConverter.<String>builder()
.setSerializationSchema(new SimpleStringSchema())
.build();
+ elementConverter.open(null);
String testString = "{many hands make light work;";
@@ -51,4 +53,30 @@
byte[] serializedString = (new SimpleStringSchema()).serialize(testString);
assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
}
+
+ @Test
+ void elementConverterWillOpenSerializationSchema() {
+ OpenCheckingStringSchema openCheckingStringSchema = new OpenCheckingStringSchema();
+ ElementConverter<String, Record> elementConverter =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(openCheckingStringSchema)
+ .build();
+ elementConverter.open(null);
+ assertThat(openCheckingStringSchema.isOpen()).isTrue();
+ }
+
+ private static class OpenCheckingStringSchema extends SimpleStringSchema {
+
+ private boolean isOpen = false;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ super.open(context);
+ isOpen = true;
+ }
+
+ public Boolean isOpen() {
+ return isOpen;
+ }
+ }
}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
index a7e4411..4030108 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
@@ -19,6 +19,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.MetricGroup;
@@ -48,8 +49,6 @@
*/
private final PartitionKeyGenerator<InputT> partitionKeyGenerator;
- private boolean schemaOpened = false;
-
private KinesisStreamsSinkElementConverter(
SerializationSchema<InputT> serializationSchema,
PartitionKeyGenerator<InputT> partitionKeyGenerator) {
@@ -59,33 +58,30 @@
@Override
public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
- checkOpened();
return PutRecordsRequestEntry.builder()
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
.partitionKey(partitionKeyGenerator.apply(element))
.build();
}
- private void checkOpened() {
- if (!schemaOpened) {
- try {
- serializationSchema.open(
- new SerializationSchema.InitializationContext() {
- @Override
- public MetricGroup getMetricGroup() {
- return new UnregisteredMetricsGroup();
- }
+ @Override
+ public void open(Sink.InitContext context) {
+ try {
+ serializationSchema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return SimpleUserCodeClassLoader.create(
- KinesisStreamsSinkElementConverter.class.getClassLoader());
- }
- });
- schemaOpened = true;
- } catch (Exception e) {
- throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
- }
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ KinesisStreamsSinkElementConverter.class.getClassLoader());
+ }
+ });
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
}
}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java
new file mode 100644
index 0000000..7d3a917
--- /dev/null
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class KinesisStreamsSinkElementConverterTest {
+ @Test
+ void elementConverterWillOpenSerializationSchema() {
+ OpenCheckingStringSchema openCheckingStringSchema = new OpenCheckingStringSchema();
+ ElementConverter<String, PutRecordsRequestEntry> elementConverter =
+ KinesisStreamsSinkElementConverter.<String>builder()
+ .setSerializationSchema(openCheckingStringSchema)
+ .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
+ .build();
+ elementConverter.open(null);
+ assertThat(openCheckingStringSchema.isOpen()).isTrue();
+ }
+
+ private static class OpenCheckingStringSchema extends SimpleStringSchema {
+
+ private boolean isOpen = false;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ super.open(context);
+ isOpen = true;
+ }
+
+ public Boolean isOpen() {
+ return isOpen;
+ }
+ }
+}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
index a7734d8..d45b060 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
@@ -18,8 +18,10 @@
package org.apache.flink.connector.dynamodb.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.enhanced.dynamodb.mapper.BeanTableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
@@ -54,18 +56,18 @@
@Override
public DynamoDbWriteRequest apply(InputT element, SinkWriter.Context context) {
- if (tableSchema == null) {
- // We have to lazily initialize this because BeanTableSchema is not serializable and
- // there is no open() method on ElementConverter (FLINK-29938)
- tableSchema = createTableSchema(recordType);
- }
-
+ Preconditions.checkNotNull(tableSchema, "Table schema has not been initialized");
return new DynamoDbWriteRequest.Builder()
.setType(DynamoDbWriteRequestType.PUT)
.setItem(tableSchema.itemToMap(element, ignoreNulls))
.build();
}
+ @Override
+ public void open(Sink.InitContext context) {
+ tableSchema = createTableSchema(recordType);
+ }
+
private BeanTableSchema<InputT> createTableSchema(final Class<InputT> recordType) {
return BeanTableSchema.create(recordType);
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
index 258ab8b..13a733f 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
@@ -41,6 +41,7 @@
void testConvertOrderToDynamoDbWriteRequest() {
ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
new DynamoDbBeanElementConverter<>(Order.class);
+ elementConverter.open(null);
Order order = new Order("orderId", 1, 2.0);
DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -56,6 +57,7 @@
void testConvertOrderToDynamoDbWriteRequestWithIgnoresNull() {
ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
new DynamoDbBeanElementConverter<>(Order.class, true);
+ elementConverter.open(null);
Order order = new Order(null, 1, 2.0);
DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -67,6 +69,7 @@
void testConvertOrderToDynamoDbWriteRequestWritesNull() {
ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
new DynamoDbBeanElementConverter<>(Order.class, false);
+ elementConverter.open(null);
Order order = new Order(null, 1, 2.0);
DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -74,4 +77,15 @@
assertThat(actual.getItem()).containsOnlyKeys("orderId", "quantity", "total");
assertThat(actual.getItem().get("orderId").nul()).isTrue();
}
+
+ @Test
+ void testConvertWithClosedConvertedThrowsException() {
+ ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
+ new DynamoDbBeanElementConverter<>(Order.class);
+ Order order = new Order(null, 1, 2.0);
+
+ assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(() -> elementConverter.apply(order, null))
+ .withMessageContaining("Table schema has not been initialized");
+ }
}