[Improve][Connector-V2] Improve http connector (#2833)
[format][json] Support read json ARRAY & MAP
[hotfix][build] connector-http-base can't skip maven-shade-plugin
[connector][http] Improve http connector
* Support `SimpleTextSchema`
* Support retry for request http server
* Support request http interval in stream mode
diff --git a/docs/en/connector-v2/sink/Http.md b/docs/en/connector-v2/sink/Http.md
index 8f4ab25..2a5cb43 100644
--- a/docs/en/connector-v2/sink/Http.md
+++ b/docs/en/connector-v2/sink/Http.md
@@ -17,12 +17,17 @@
## Options
-| name | type | required | default value |
-| --- |--------| --- | --- |
-| url | String | Yes | - |
-| headers | Map | No | - |
+| name | type | required | default value |
+|------------------------------------|--------|----------|---------------|
+| url | String | Yes | - |
+| headers | Map | No | - |
+| params | Map | No | - |
+| retry | int | No | - |
+| retry_backoff_multiplier_ms | int | No | 100 |
+| retry_backoff_max_ms | int | No | 10000 |
-### url [string]
+
+### url [String]
http request url
@@ -30,6 +35,22 @@
http headers
+### params [Map]
+
+http params
+
+### retry [int]
+
+The max retry times if request http return to `IOException`
+
+### retry_backoff_multiplier_ms [int]
+
+The retry-backoff times(millis) multiplier if request http failed
+
+### retry_backoff_max_ms [int]
+
+The maximum retry-backoff times(millis) if request http failed
+
## Example
simple:
diff --git a/docs/en/connector-v2/source/Http.md b/docs/en/connector-v2/source/Http.md
index 0fbbc43..21ac01e 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -17,22 +17,26 @@
## Options
-| name | type | required | default value |
-|---------------|--------|----------|---------------|
-| url | String | Yes | - |
-| schema | config | No | - |
-| schema.fields | config | No | - |
-| format | string | No | json |
-| method | String | No | get |
-| headers | Map | No | - |
-| params | Map | No | - |
-| body | String | No | - |
+| name | type | required | default value |
+|------------------------------------|--------|----------|---------------|
+| url | String | Yes | - |
+| schema | Config | No | - |
+| schema.fields | Config | No | - |
+| format | String | No | json |
+| method | String | No | get |
+| headers | Map | No | - |
+| params | Map | No | - |
+| body | String | No | - |
+| poll_interval_ms | int | No | - |
+| retry | int | No | - |
+| retry_backoff_multiplier_ms | int | No | 100 |
+| retry_backoff_max_ms | int | No | 10000 |
-### url [string]
+### url [String]
http request url
-### method [string]
+### method [String]
http request method, only supports GET, POST method.
@@ -48,6 +52,22 @@
http body
+### poll_interval_ms [int]
+
+request http api interval(millis) in stream mode
+
+### retry [int]
+
+The max retry times if request http return to `IOException`
+
+### retry_backoff_multiplier_ms [int]
+
+The retry-backoff times(millis) multiplier if request http failed
+
+### retry_backoff_max_ms [int]
+
+The maximum retry-backoff times(millis) if request http failed
+
### format [String]
the format of upstream data, now only support `json` `text`, default `json`.
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index ba62153..8e33fdc 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -32,6 +32,7 @@
<properties>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
+ <guava-retrying.version>2.0.0</guava-retrying.version>
</properties>
<dependencies>
@@ -58,31 +59,10 @@
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.rholder</groupId>
+ <artifactId>guava-retrying</artifactId>
+ <version>${guava-retrying.version}</version>
+ </dependency>
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <!-- base module need skip shading -->
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- make sure that flatten runs after maven-shade-plugin -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>flatten-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
index 089d61e..0eae378 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
@@ -17,6 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.http.client;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
@@ -48,9 +58,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+@Slf4j
public class HttpClientProvider implements AutoCloseable {
- private final CloseableHttpClient httpClient;
private static final String ENCODING = "UTF-8";
private static final String APPLICATION_JSON = "application/json";
private static final int CONNECT_TIMEOUT = 6000 * 2;
@@ -60,13 +71,33 @@
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT)
.build();
+ private final CloseableHttpClient httpClient;
+ private final Retryer<CloseableHttpResponse> retryer;
- private HttpClientProvider() {
- httpClient = HttpClients.createDefault();
+ public HttpClientProvider(HttpParameter httpParameter) {
+ this.httpClient = HttpClients.createDefault();
+ this.retryer = buildRetryer(httpParameter);
}
- public static HttpClientProvider getInstance() {
- return Singleton.INSTANCE;
+ private Retryer<CloseableHttpResponse> buildRetryer(HttpParameter httpParameter) {
+ if (httpParameter.getRetry() < 1) {
+ return RetryerBuilder.<CloseableHttpResponse>newBuilder().build();
+ }
+ return RetryerBuilder.<CloseableHttpResponse>newBuilder()
+ .retryIfException(ex -> ExceptionUtils.indexOfType(ex, IOException.class) != -1)
+ .withStopStrategy(StopStrategies.stopAfterAttempt(httpParameter.getRetry()))
+ .withWaitStrategy(WaitStrategies.fibonacciWait(httpParameter.getRetryBackoffMultiplierMillis(),
+ httpParameter.getRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS))
+ .withRetryListener(new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ log.warn(String.format("[%d] request http failed",
+ attempt.getAttemptNumber()), attempt.getExceptionCause());
+ }
+ }
+ })
+ .build();
}
public HttpResponse execute(String url, String method, Map<String, String> headers, Map<String, String> params) throws Exception {
@@ -275,9 +306,9 @@
return doPost(url, params);
}
- private HttpResponse getResponse(HttpRequestBase request) throws IOException {
+ private HttpResponse getResponse(HttpRequestBase request) throws Exception {
// execute request
- try (CloseableHttpResponse httpResponse = httpClient.execute(request)) {
+ try (CloseableHttpResponse httpResponse = retryWithException(request)) {
// get return result
if (httpResponse != null && httpResponse.getStatusLine() != null) {
String content = "";
@@ -290,6 +321,10 @@
return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
}
+ private CloseableHttpResponse retryWithException(HttpRequestBase request) throws Exception {
+ return retryer.call(() -> httpClient.execute(request));
+ }
+
private void addParameters(URIBuilder builder, Map<String, String> params) {
if (Objects.isNull(params) || params.isEmpty()) {
return;
@@ -333,8 +368,4 @@
httpClient.close();
}
}
-
- private static class Singleton {
- private static final HttpClientProvider INSTANCE = new HttpClientProvider();
- }
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 50ca7a0..8ee089a 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -27,4 +27,8 @@
public static final String SCHEMA = "schema";
public static final String FORMAT = "format";
public static final String DEFAULT_FORMAT = "json";
+ public static final String POLL_INTERVAL_MILLS = "poll_interval_ms";
+ public static final String RETRY = "retry";
+ public static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms";
+ public static final String RETRY_BACKOFF_MAX_MS = "retry_backoff_max_ms";
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index c22a248..f2f1b33 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -26,12 +26,18 @@
import java.util.stream.Collectors;
@Data
+@SuppressWarnings("MagicNumber")
public class HttpParameter implements Serializable {
private String url;
private String method;
private Map<String, String> headers;
private Map<String, String> params;
private String body;
+ private int pollIntervalMillis;
+ private int retry;
+ private int retryBackoffMultiplierMillis = 100;
+ private int retryBackoffMaxMillis = 10000;
+
public void buildWithConfig(Config pluginConfig) {
// set url
this.setUrl(pluginConfig.getString(HttpConfig.URL));
@@ -53,5 +59,17 @@
if (pluginConfig.hasPath(HttpConfig.BODY)) {
this.setBody(pluginConfig.getString(HttpConfig.BODY));
}
+ if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) {
+ this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS));
+ }
+ if (pluginConfig.hasPath(HttpConfig.RETRY)) {
+ this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
+ if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
+ this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
+ }
+ if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
+ this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
+ }
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index e68a1c4..4b90f8c 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -34,7 +34,7 @@
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkWriter.class);
- protected final HttpClientProvider httpClient = HttpClientProvider.getInstance();
+ protected final HttpClientProvider httpClient;
protected final SeaTunnelRowType seaTunnelRowType;
protected final HttpParameter httpParameter;
protected final SerializationSchema serializationSchema;
@@ -42,6 +42,7 @@
public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
+ this.httpClient = new HttpClientProvider(httpParameter);
this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
new file mode 100644
index 0000000..f01bc7f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import lombok.AllArgsConstructor;
+
+import java.io.IOException;
+
+@AllArgsConstructor
+public class DeserializationCollector {
+
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+ if (deserializationSchema instanceof JsonDeserializationSchema) {
+ collectJson(message, (JsonDeserializationSchema) deserializationSchema, out);
+ } else {
+ SeaTunnelRow deserialize = deserializationSchema.deserialize(message);
+ out.collect(deserialize);
+ }
+ }
+
+ private void collectJson(byte[] message,
+ JsonDeserializationSchema jsonDeserializationSchema,
+ Collector<SeaTunnelRow> out) throws IOException {
+ JsonNode jsonNode = jsonDeserializationSchema.convertBytes(message);
+ if (jsonNode.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) jsonNode;
+ for (int i = 0; i < arrayNode.size(); i++) {
+ SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(arrayNode.get(i));
+ out.collect(deserialize);
+ }
+ } else {
+ SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(jsonNode);
+ out.collect(deserialize);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index dff630b..9887f7c 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -68,18 +68,22 @@
if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ // default use json format
+ String format = HttpConfig.DEFAULT_FORMAT;
+ if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
+ format = pluginConfig.getString(HttpConfig.FORMAT);
+ }
+ switch (format) {
+ case HttpConfig.DEFAULT_FORMAT:
+ this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
+ break;
+ default:
+ // TODO: use format SPI
+ throw new UnsupportedOperationException("Unsupported format: " + format);
+ }
} else {
this.rowType = SeaTunnelSchema.buildSimpleTextSchema();
- }
- // TODO: use format SPI
- // default use json format
- String format;
- if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
- format = pluginConfig.getString(HttpConfig.FORMAT);
- this.deserializationSchema = null;
- } else {
- format = HttpConfig.DEFAULT_FORMAT;
- this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
+ this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType);
}
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 5f7ff13..0419e2e 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -27,6 +27,7 @@
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,17 +39,17 @@
protected final SingleSplitReaderContext context;
protected final HttpParameter httpParameter;
protected HttpClientProvider httpClient;
- protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private final DeserializationCollector deserializationCollector;
public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
this.context = context;
this.httpParameter = httpParameter;
- this.deserializationSchema = deserializationSchema;
+ this.deserializationCollector = new DeserializationCollector(deserializationSchema);
}
@Override
public void open() {
- httpClient = HttpClientProvider.getInstance();
+ httpClient = new HttpClientProvider(httpParameter);
}
@Override
@@ -64,11 +65,8 @@
HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams());
if (HttpResponse.STATUS_OK == response.getCode()) {
String content = response.getContent();
- if (deserializationSchema != null) {
- deserializationSchema.deserialize(content.getBytes(), output);
- } else {
- // TODO: use seatunnel-text-format
- output.collect(new SeaTunnelRow(new Object[]{content}));
+ if (!Strings.isNullOrEmpty(content)) {
+ deserializationCollector.collect(content.getBytes(), output);
}
return;
}
@@ -80,6 +78,10 @@
// signal to the source that we have reached the end of the data.
LOGGER.info("Closed the bounded http source");
context.signalNoMoreElement();
+ } else {
+ if (httpParameter.getPollIntervalMillis() > 0) {
+ Thread.sleep(httpParameter.getPollIntervalMillis());
+ }
}
}
}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/SimpleTextDeserializationSchema.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/SimpleTextDeserializationSchema.java
new file mode 100644
index 0000000..ae56695
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/SimpleTextDeserializationSchema.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class SimpleTextDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+ private SeaTunnelRowType rowType;
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) {
+ return new SeaTunnelRow(new Object[]{new String(message)});
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return rowType;
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index bc5bcd4..8ea60e9 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -30,6 +30,7 @@
<artifactId>seatunnel-flink-connector-v2-example</artifactId>
<properties>
<flink.scope>compile</flink.scope>
+ <mock-webserver.version>3.6.0</mock-webserver.version>
</properties>
<dependencies>
@@ -76,6 +77,11 @@
<artifactId>connector-dingtalk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-http-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->
<dependency>
@@ -115,6 +121,11 @@
<scope>${flink.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${mock-webserver.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/HttpConnectorExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/HttpConnectorExample.java
new file mode 100644
index 0000000..9617031
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/HttpConnectorExample.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.flink.v2;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+public class HttpConnectorExample {
+
+ private static final SeaTunnelRowType ROW_TYPE = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+ private static final List<SeaTunnelRow> TEST_DATASET = generateTestDataset();
+ private static final JsonSerializationSchema SERIALIZATION_SCHEMA = new JsonSerializationSchema(ROW_TYPE);
+ private static final JsonDeserializationSchema DESERIALIZATION_SCHEMA = new JsonDeserializationSchema(
+ false, false, ROW_TYPE);
+ private static final int MOCK_WEB_SERVER_PORT = 18888;
+
+ public static void main(String[] args) throws Exception {
+ BlockingQueue<String> readDataQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<String> writeDataQueue = new LinkedBlockingQueue<>();
+
+ String configFile = "/examples/http_source_to_sink.conf";
+ try (MockWebServer mockWebServer = startWebServer(MOCK_WEB_SERVER_PORT, readDataQueue, writeDataQueue)) {
+ log.info("Submitting http job: {}", configFile);
+ readDataQueue.offer(toJson(TEST_DATASET));
+ submitHttpJob(configFile);
+ log.info("Submitted http job: {}", configFile);
+
+ log.info("Validate http sink data...");
+ checkArgument(writeDataQueue.size() == TEST_DATASET.size());
+ List<SeaTunnelRow> results = new ArrayList<>();
+ for (int i = 0; i < TEST_DATASET.size(); i++) {
+ SeaTunnelRow row = DESERIALIZATION_SCHEMA.deserialize(writeDataQueue.take().getBytes());
+ results.add(row);
+ }
+ checkArgument(results.equals(TEST_DATASET));
+ }
+ }
+
+ private static void submitHttpJob(String configurePath) throws Exception {
+ String configFile = getTestConfigFile(configurePath);
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setConfigFile(configFile);
+ flinkCommandArgs.setCheckConfig(false);
+ flinkCommandArgs.setVariables(null);
+ Command<FlinkCommandArgs> flinkCommand =
+ new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
+ Seatunnel.run(flinkCommand);
+ }
+
+ private static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+ URL resource = HttpConnectorExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " + configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+
+ private static MockWebServer startWebServer(int port,
+ BlockingQueue<String> readDataQueue,
+ BlockingQueue<String> writeDataQueue) throws Exception {
+ MockWebServer mockWebServer = new MockWebServer();
+ mockWebServer.start(InetAddress.getByName("localhost"), port);
+ mockWebServer.setDispatcher(new Dispatcher() {
+ @Override
+ public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
+ log.info("received request : {}", request.getPath());
+ if (request.getPath().endsWith("/read")) {
+ if (readDataQueue.isEmpty()) {
+ return new MockResponse();
+ }
+ String readData = readDataQueue.take();
+ log.info("Take readDataQueue, remaining capacity: {}", readDataQueue.size());
+ return new MockResponse().setBody(readData);
+ }
+ if (request.getPath().endsWith("/write")) {
+ writeDataQueue.offer(request.getUtf8Body());
+ log.info("Offer writeDataQueue, remaining capacity: {}", writeDataQueue.size());
+ return new MockResponse().setBody("write request");
+ }
+ return new MockResponse();
+ }
+ });
+ log.info("Started MockWebServer: {}", mockWebServer.url("/"));
+ return mockWebServer;
+ }
+
+ @SuppressWarnings("MagicNumber")
+ private static List<SeaTunnelRow> generateTestDataset() {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ private static String toJson(List<SeaTunnelRow> rows) throws IOException {
+ ArrayNode arrayNode = SERIALIZATION_SCHEMA.getMapper().createArrayNode();
+ for (SeaTunnelRow row : rows) {
+ byte[] jsonBytes = SERIALIZATION_SCHEMA.serialize(row);
+ JsonNode jsonNode = SERIALIZATION_SCHEMA.getMapper().readTree(jsonBytes);
+ arrayNode.add(jsonNode);
+ }
+ return SERIALIZATION_SCHEMA.getMapper().writeValueAsString(arrayNode);
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/http_source_to_sink.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/http_source_to_sink.conf
new file mode 100644
index 0000000..837884b
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/http_source_to_sink.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Http {
+ url = "http://localhost:18888/read"
+ poll_interval_ms = 1000
+ retry = 1
+ retry_backoff_multiplier_ms = 100
+ retry_backoff_max_ms = 1000
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Http {
+ url = "http://localhost:18888/write"
+ retry = 1
+ retry_backoff_multiplier_ms = 100
+ retry_backoff_max_ms = 1000
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index 202f62b..5d6f199 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -104,8 +104,27 @@
if (message == null) {
return null;
}
+ return convertJsonNode(convertBytes(message));
+ }
+
+ public SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
+ if (jsonNode == null) {
+ return null;
+ }
try {
- return (SeaTunnelRow) runtimeConverter.convert(objectMapper.readTree(message));
+ return (SeaTunnelRow) runtimeConverter.convert(jsonNode);
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ return null;
+ }
+ throw new IOException(
+ format("Failed to deserialize JSON '%s'.", jsonNode.asText()), t);
+ }
+ }
+
+ public JsonNode convertBytes(byte[] message) throws IOException {
+ try {
+ return objectMapper.readTree(message);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 3510acf..6763b38 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -26,6 +26,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
public class JsonSerializationSchema implements SerializationSchema {
@@ -38,6 +39,7 @@
private transient ObjectNode node;
/** Object mapper that is used to create output JSON objects. */
+ @Getter
private final ObjectMapper mapper = new ObjectMapper();
private final RowToJsonConverters.RowToJsonConverter runtimeConverter;
@@ -59,7 +61,7 @@
return mapper.writeValueAsBytes(node);
} catch (Throwable e) {
throw new RuntimeException(
- String.format("Failed to deserialize JSON '%s'.", row), e);
+ String.format("Failed to deserialize JSON '%s'.", row), e);
}
}
}
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 138b553..e0d6b09 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -20,17 +20,21 @@
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -41,6 +45,9 @@
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
/** Tool class used to convert from {@link JsonNode} to {@link org.apache.seatunnel.api.table.type.SeaTunnelRow}. * */
public class JsonToRowConverters implements Serializable {
@@ -107,6 +114,10 @@
return this::convertToBytes;
case DECIMAL:
return createDecimalConverter((BasicType<BigDecimal>) type);
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case MAP:
+ return createMapConverter((MapType) type);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -202,7 +213,7 @@
};
}
- public JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
+ private JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
final JsonToRowConverter[] fieldConverters =
Arrays.stream(rowType.getFieldTypes())
.map(this::createConverter)
@@ -228,6 +239,33 @@
};
}
+ private JsonToRowConverter createArrayConverter(ArrayType arrayType) {
+ BasicType elementType = arrayType.getElementType();
+ JsonToRowConverter elementConverter = createConverter(elementType);
+ return jsonNode -> {
+ ArrayNode arrayNode = (ArrayNode) jsonNode;
+ Object[] arrayValue = (Object[]) Array.newInstance(elementType.getTypeClass(), arrayNode.size());
+ for (int i = 0; i < arrayNode.size(); i++) {
+ arrayValue[i] = elementConverter.convert(arrayNode.get(i));
+ }
+ return arrayValue;
+ };
+ }
+
+ private JsonToRowConverter createMapConverter(MapType mapType) {
+ JsonToRowConverter valueConverter = createConverter(mapType.getValueType());
+ return jsonNode -> {
+ ObjectNode objectNode = (ObjectNode) jsonNode;
+ Map<String, Object> mapValue = new HashMap<>();
+ for (Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
+ iterator.hasNext();) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ mapValue.put(entry.getKey(), valueConverter.convert(entry.getValue()));
+ }
+ return mapValue;
+ };
+ }
+
private Object convertField(
JsonToRowConverter fieldConverter, String fieldName, JsonNode field) {
if (field == null) {