[Improve][Connector-V2] Improve http connector (#2833)

[format][json] Support read json ARRAY & MAP
[hotfix][build] connector-http-base can't skip maven-shade-plugin
[connector][http] Improve http connector 
* Support `SimpleTextSchema`
* Support retry for request http server
* Support request http interval in stream mode
diff --git a/docs/en/connector-v2/sink/Http.md b/docs/en/connector-v2/sink/Http.md
index 8f4ab25..2a5cb43 100644
--- a/docs/en/connector-v2/sink/Http.md
+++ b/docs/en/connector-v2/sink/Http.md
@@ -17,12 +17,17 @@
 
 ##  Options
 
-| name | type   | required | default value |
-| --- |--------| --- | --- |
-| url | String | Yes | - |
-| headers | Map    | No | - |
+| name                               | type   | required | default value |
+|------------------------------------|--------|----------|---------------|
+| url                                | String | Yes      | -             |
+| headers                            | Map    | No       | -             |
+| params                             | Map    | No       | -             |
+| retry                              | int    | No       | -             |
+| retry_backoff_multiplier_ms        | int    | No       | 100           |
+| retry_backoff_max_ms               | int    | No       | 10000         |
 
-### url [string]
+
+### url [String]
 
 http request url
 
@@ -30,6 +35,22 @@
 
 http headers
 
+### params [Map]
+
+http params
+
+### retry [int]
+
+The max retry times if request http return to `IOException`
+
+### retry_backoff_multiplier_ms [int]
+
+The retry-backoff times(millis) multiplier if request http failed
+
+### retry_backoff_max_ms [int]
+
+The maximum retry-backoff times(millis) if request http failed
+
 ## Example
 
 simple:
diff --git a/docs/en/connector-v2/source/Http.md b/docs/en/connector-v2/source/Http.md
index 0fbbc43..21ac01e 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -17,22 +17,26 @@
 
 ##  Options
 
-| name          | type   | required | default value |
-|---------------|--------|----------|---------------|
-| url           | String | Yes      | -             |
-| schema        | config | No       | -             |
-| schema.fields | config | No       | -             |
-| format        | string | No       | json          |
-| method        | String | No       | get           |
-| headers       | Map    | No       | -             |
-| params        | Map    | No       | -             |
-| body          | String | No       | -             |
+| name                               | type   | required | default value |
+|------------------------------------|--------|----------|---------------|
+| url                                | String | Yes      | -             |
+| schema                             | Config | No       | -             |
+| schema.fields                      | Config | No       | -             |
+| format                             | String | No       | json          |
+| method                             | String | No       | get           |
+| headers                            | Map    | No       | -             |
+| params                             | Map    | No       | -             |
+| body                               | String | No       | -             |
+| poll_interval_ms                   | int    | No       | -             |
+| retry                              | int    | No       | -             |
+| retry_backoff_multiplier_ms        | int    | No       | 100           |
+| retry_backoff_max_ms               | int    | No       | 10000         |
 
-### url [string]
+### url [String]
 
 http request url
 
-### method [string]
+### method [String]
 
 http request method, only supports GET, POST method.
 
@@ -48,6 +52,22 @@
 
 http body
 
+### poll_interval_ms [int]
+
+request http api interval(millis) in stream mode
+
+### retry [int]
+
+The max retry times if request http return to `IOException`
+
+### retry_backoff_multiplier_ms [int]
+
+The retry-backoff times(millis) multiplier if request http failed
+
+### retry_backoff_max_ms [int]
+
+The maximum retry-backoff times(millis) if request http failed
+
 ### format [String]
 
 the format of upstream data, now only support `json` `text`, default `json`.
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index ba62153..8e33fdc 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -32,6 +32,7 @@
     <properties>
         <httpclient.version>4.5.13</httpclient.version>
         <httpcore.version>4.4.4</httpcore.version>
+        <guava-retrying.version>2.0.0</guava-retrying.version>
     </properties>
 
     <dependencies>
@@ -58,31 +59,10 @@
             <artifactId>httpcore</artifactId>
             <version>${httpcore.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.rholder</groupId>
+            <artifactId>guava-retrying</artifactId>
+            <version>${guava-retrying.version}</version>
+        </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <!-- base module need skip shading -->
-                        <configuration>
-                            <skip>true</skip>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <!-- make sure that flatten runs after maven-shade-plugin -->
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>flatten-maven-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
index 089d61e..0eae378 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
@@ -17,6 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.client;
 
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.http.HttpStatus;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.config.RequestConfig;
@@ -48,9 +58,10 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+@Slf4j
 public class HttpClientProvider implements AutoCloseable {
-    private final CloseableHttpClient httpClient;
     private static final String ENCODING = "UTF-8";
     private static final String APPLICATION_JSON = "application/json";
     private static final int CONNECT_TIMEOUT = 6000 * 2;
@@ -60,13 +71,33 @@
             .setConnectTimeout(CONNECT_TIMEOUT)
             .setSocketTimeout(SOCKET_TIMEOUT)
             .build();
+    private final CloseableHttpClient httpClient;
+    private final Retryer<CloseableHttpResponse> retryer;
 
-    private HttpClientProvider() {
-        httpClient = HttpClients.createDefault();
+    public HttpClientProvider(HttpParameter httpParameter) {
+        this.httpClient = HttpClients.createDefault();
+        this.retryer = buildRetryer(httpParameter);
     }
 
-    public static HttpClientProvider getInstance() {
-        return Singleton.INSTANCE;
+    private Retryer<CloseableHttpResponse> buildRetryer(HttpParameter httpParameter) {
+        if (httpParameter.getRetry() < 1) {
+            return RetryerBuilder.<CloseableHttpResponse>newBuilder().build();
+        }
+        return RetryerBuilder.<CloseableHttpResponse>newBuilder()
+            .retryIfException(ex -> ExceptionUtils.indexOfType(ex, IOException.class) != -1)
+            .withStopStrategy(StopStrategies.stopAfterAttempt(httpParameter.getRetry()))
+            .withWaitStrategy(WaitStrategies.fibonacciWait(httpParameter.getRetryBackoffMultiplierMillis(),
+                httpParameter.getRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS))
+            .withRetryListener(new RetryListener() {
+                @Override
+                public <V> void onRetry(Attempt<V> attempt) {
+                    if (attempt.hasException()) {
+                        log.warn(String.format("[%d] request http failed",
+                            attempt.getAttemptNumber()), attempt.getExceptionCause());
+                    }
+                }
+            })
+            .build();
     }
 
     public HttpResponse execute(String url, String method, Map<String, String> headers, Map<String, String> params) throws Exception {
@@ -275,9 +306,9 @@
         return doPost(url, params);
     }
 
-    private HttpResponse getResponse(HttpRequestBase request) throws IOException {
+    private HttpResponse getResponse(HttpRequestBase request) throws Exception {
         // execute request
-        try (CloseableHttpResponse httpResponse = httpClient.execute(request)) {
+        try (CloseableHttpResponse httpResponse = retryWithException(request)) {
             // get return result
             if (httpResponse != null && httpResponse.getStatusLine() != null) {
                 String content = "";
@@ -290,6 +321,10 @@
         return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
     }
 
+    private CloseableHttpResponse retryWithException(HttpRequestBase request) throws Exception {
+        return retryer.call(() -> httpClient.execute(request));
+    }
+
     private void addParameters(URIBuilder builder, Map<String, String> params) {
         if (Objects.isNull(params) || params.isEmpty()) {
             return;
@@ -333,8 +368,4 @@
             httpClient.close();
         }
     }
-
-    private static class Singleton {
-        private static final HttpClientProvider INSTANCE = new HttpClientProvider();
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 50ca7a0..8ee089a 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -27,4 +27,8 @@
     public static final String SCHEMA = "schema";
     public static final String FORMAT = "format";
     public static final String DEFAULT_FORMAT = "json";
+    public static final String POLL_INTERVAL_MILLS = "poll_interval_ms";
+    public static final String RETRY = "retry";
+    public static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms";
+    public static final String RETRY_BACKOFF_MAX_MS = "retry_backoff_max_ms";
 }
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index c22a248..f2f1b33 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -26,12 +26,18 @@
 import java.util.stream.Collectors;
 
 @Data
+@SuppressWarnings("MagicNumber")
 public class HttpParameter implements Serializable {
     private String url;
     private String method;
     private Map<String, String> headers;
     private Map<String, String> params;
     private String body;
+    private int pollIntervalMillis;
+    private int retry;
+    private int retryBackoffMultiplierMillis = 100;
+    private int retryBackoffMaxMillis = 10000;
+
     public void buildWithConfig(Config pluginConfig) {
         // set url
         this.setUrl(pluginConfig.getString(HttpConfig.URL));
@@ -53,5 +59,17 @@
         if (pluginConfig.hasPath(HttpConfig.BODY)) {
             this.setBody(pluginConfig.getString(HttpConfig.BODY));
         }
+        if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) {
+            this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS));
+        }
+        if (pluginConfig.hasPath(HttpConfig.RETRY)) {
+            this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
+            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
+                this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
+            }
+            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
+                this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
+            }
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index e68a1c4..4b90f8c 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -34,7 +34,7 @@
 
 public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
     private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkWriter.class);
-    protected final HttpClientProvider httpClient = HttpClientProvider.getInstance();
+    protected final HttpClientProvider httpClient;
     protected final SeaTunnelRowType seaTunnelRowType;
     protected final HttpParameter httpParameter;
     protected final SerializationSchema serializationSchema;
@@ -42,6 +42,7 @@
     public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
+        this.httpClient = new HttpClientProvider(httpParameter);
         this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
     }
 
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
new file mode 100644
index 0000000..f01bc7f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import lombok.AllArgsConstructor;
+
+import java.io.IOException;
+
+@AllArgsConstructor
+public class DeserializationCollector {
+
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        if (deserializationSchema instanceof JsonDeserializationSchema) {
+            collectJson(message, (JsonDeserializationSchema) deserializationSchema, out);
+        } else {
+            SeaTunnelRow deserialize = deserializationSchema.deserialize(message);
+            out.collect(deserialize);
+        }
+    }
+
+    private void collectJson(byte[] message,
+                             JsonDeserializationSchema jsonDeserializationSchema,
+                             Collector<SeaTunnelRow> out) throws IOException {
+        JsonNode jsonNode = jsonDeserializationSchema.convertBytes(message);
+        if (jsonNode.isArray()) {
+            ArrayNode arrayNode = (ArrayNode) jsonNode;
+            for (int i = 0; i < arrayNode.size(); i++) {
+                SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(arrayNode.get(i));
+                out.collect(deserialize);
+            }
+        } else {
+            SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(jsonNode);
+            out.collect(deserialize);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index dff630b..9887f7c 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -68,18 +68,22 @@
         if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
             Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
             this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+            // default use json format
+            String format = HttpConfig.DEFAULT_FORMAT;
+            if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
+                format = pluginConfig.getString(HttpConfig.FORMAT);
+            }
+            switch (format) {
+                case HttpConfig.DEFAULT_FORMAT:
+                    this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
+                    break;
+                default:
+                    // TODO: use format SPI
+                    throw new UnsupportedOperationException("Unsupported format: " + format);
+            }
         } else {
             this.rowType = SeaTunnelSchema.buildSimpleTextSchema();
-        }
-        // TODO: use format SPI
-        // default use json format
-        String format;
-        if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
-            format = pluginConfig.getString(HttpConfig.FORMAT);
-            this.deserializationSchema = null;
-        } else {
-            format = HttpConfig.DEFAULT_FORMAT;
-            this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
+            this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType);
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 5f7ff13..0419e2e 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -27,6 +27,7 @@
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 
+import com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,17 +39,17 @@
     protected final SingleSplitReaderContext context;
     protected final HttpParameter httpParameter;
     protected HttpClientProvider httpClient;
-    protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private final DeserializationCollector deserializationCollector;
 
     public HttpSourceReader(HttpParameter httpParameter, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
         this.context = context;
         this.httpParameter = httpParameter;
-        this.deserializationSchema = deserializationSchema;
+        this.deserializationCollector = new DeserializationCollector(deserializationSchema);
     }
 
     @Override
     public void open() {
-        httpClient = HttpClientProvider.getInstance();
+        httpClient = new HttpClientProvider(httpParameter);
     }
 
     @Override
@@ -64,11 +65,8 @@
             HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams());
             if (HttpResponse.STATUS_OK == response.getCode()) {
                 String content = response.getContent();
-                if (deserializationSchema != null) {
-                    deserializationSchema.deserialize(content.getBytes(), output);
-                } else {
-                    // TODO: use seatunnel-text-format
-                    output.collect(new SeaTunnelRow(new Object[]{content}));
+                if (!Strings.isNullOrEmpty(content)) {
+                    deserializationCollector.collect(content.getBytes(), output);
                 }
                 return;
             }
@@ -80,6 +78,10 @@
                 // signal to the source that we have reached the end of the data.
                 LOGGER.info("Closed the bounded http source");
                 context.signalNoMoreElement();
+            } else {
+                if (httpParameter.getPollIntervalMillis() > 0) {
+                    Thread.sleep(httpParameter.getPollIntervalMillis());
+                }
             }
         }
     }
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/SimpleTextDeserializationSchema.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/SimpleTextDeserializationSchema.java
new file mode 100644
index 0000000..ae56695
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/SimpleTextDeserializationSchema.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class SimpleTextDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private SeaTunnelRowType rowType;
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) {
+        return new SeaTunnelRow(new Object[]{new String(message)});
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return rowType;
+    }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index bc5bcd4..8ea60e9 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -30,6 +30,7 @@
     <artifactId>seatunnel-flink-connector-v2-example</artifactId>
     <properties>
         <flink.scope>compile</flink.scope>
+        <mock-webserver.version>3.6.0</mock-webserver.version>
     </properties>
 
     <dependencies>
@@ -76,6 +77,11 @@
             <artifactId>connector-dingtalk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-http-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
         <!--flink-->
         <dependency>
@@ -115,6 +121,11 @@
             <scope>${flink.scope}</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${mock-webserver.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/HttpConnectorExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/HttpConnectorExample.java
new file mode 100644
index 0000000..9617031
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/HttpConnectorExample.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.flink.v2;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Slf4j
+public class HttpConnectorExample {
+
+    private static final SeaTunnelRowType ROW_TYPE = new SeaTunnelRowType(
+        new String[]{
+            "id",
+            "c_map",
+            "c_array",
+            "c_string",
+            "c_boolean",
+            "c_tinyint",
+            "c_smallint",
+            "c_int",
+            "c_bigint",
+            "c_float",
+            "c_double",
+            "c_decimal",
+            "c_bytes",
+            "c_date",
+            "c_timestamp"
+        },
+        new SeaTunnelDataType[]{
+            BasicType.LONG_TYPE,
+            new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+            ArrayType.BYTE_ARRAY_TYPE,
+            BasicType.STRING_TYPE,
+            BasicType.BOOLEAN_TYPE,
+            BasicType.BYTE_TYPE,
+            BasicType.SHORT_TYPE,
+            BasicType.INT_TYPE,
+            BasicType.LONG_TYPE,
+            BasicType.FLOAT_TYPE,
+            BasicType.DOUBLE_TYPE,
+            new DecimalType(2, 1),
+            PrimitiveByteArrayType.INSTANCE,
+            LocalTimeType.LOCAL_DATE_TYPE,
+            LocalTimeType.LOCAL_DATE_TIME_TYPE
+        });
+    private static final List<SeaTunnelRow> TEST_DATASET = generateTestDataset();
+    private static final JsonSerializationSchema SERIALIZATION_SCHEMA = new JsonSerializationSchema(ROW_TYPE);
+    private static final JsonDeserializationSchema DESERIALIZATION_SCHEMA = new JsonDeserializationSchema(
+        false, false, ROW_TYPE);
+    private static final int MOCK_WEB_SERVER_PORT = 18888;
+
+    public static void main(String[] args) throws Exception {
+        BlockingQueue<String> readDataQueue = new LinkedBlockingQueue<>();
+        BlockingQueue<String> writeDataQueue = new LinkedBlockingQueue<>();
+
+        String configFile = "/examples/http_source_to_sink.conf";
+        try (MockWebServer mockWebServer = startWebServer(MOCK_WEB_SERVER_PORT, readDataQueue, writeDataQueue)) {
+            log.info("Submitting http job: {}", configFile);
+            readDataQueue.offer(toJson(TEST_DATASET));
+            submitHttpJob(configFile);
+            log.info("Submitted http job: {}", configFile);
+
+            log.info("Validate http sink data...");
+            checkArgument(writeDataQueue.size() == TEST_DATASET.size());
+            List<SeaTunnelRow> results = new ArrayList<>();
+            for (int i = 0; i < TEST_DATASET.size(); i++) {
+                SeaTunnelRow row = DESERIALIZATION_SCHEMA.deserialize(writeDataQueue.take().getBytes());
+                results.add(row);
+            }
+            checkArgument(results.equals(TEST_DATASET));
+        }
+    }
+
+    private static void submitHttpJob(String configurePath) throws Exception {
+        String configFile = getTestConfigFile(configurePath);
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setConfigFile(configFile);
+        flinkCommandArgs.setCheckConfig(false);
+        flinkCommandArgs.setVariables(null);
+        Command<FlinkCommandArgs> flinkCommand =
+            new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
+        Seatunnel.run(flinkCommand);
+    }
+
+    private static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+        URL resource = HttpConnectorExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Can't find config file: " + configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
+    }
+
+    private static MockWebServer startWebServer(int port,
+                                                BlockingQueue<String> readDataQueue,
+                                                BlockingQueue<String> writeDataQueue) throws Exception {
+        MockWebServer mockWebServer = new MockWebServer();
+        mockWebServer.start(InetAddress.getByName("localhost"), port);
+        mockWebServer.setDispatcher(new Dispatcher() {
+            @Override
+            public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
+                log.info("received request : {}", request.getPath());
+                if (request.getPath().endsWith("/read")) {
+                    if (readDataQueue.isEmpty()) {
+                        return new MockResponse();
+                    }
+                    String readData = readDataQueue.take();
+                    log.info("Take readDataQueue, remaining capacity: {}", readDataQueue.size());
+                    return new MockResponse().setBody(readData);
+                }
+                if (request.getPath().endsWith("/write")) {
+                    writeDataQueue.offer(request.getUtf8Body());
+                    log.info("Offer writeDataQueue, remaining capacity: {}", writeDataQueue.size());
+                    return new MockResponse().setBody("write request");
+                }
+                return new MockResponse();
+            }
+        });
+        log.info("Started MockWebServer: {}", mockWebServer.url("/"));
+        return mockWebServer;
+    }
+
+    @SuppressWarnings("MagicNumber")
+    private static List<SeaTunnelRow> generateTestDataset() {
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(new Object[]{
+                Long.valueOf(i),
+                Collections.singletonMap("key", Short.parseShort("1")),
+                new Byte[]{Byte.parseByte("1")},
+                "string",
+                Boolean.FALSE,
+                Byte.parseByte("1"),
+                Short.parseShort("1"),
+                Integer.parseInt("1"),
+                Long.parseLong("1"),
+                Float.parseFloat("1.1"),
+                Double.parseDouble("1.1"),
+                BigDecimal.valueOf(11, 1),
+                "test".getBytes(),
+                LocalDate.now(),
+                LocalDateTime.now()
+            });
+            rows.add(row);
+        }
+        return rows;
+    }
+
+    private static String toJson(List<SeaTunnelRow> rows) throws IOException {
+        ArrayNode arrayNode = SERIALIZATION_SCHEMA.getMapper().createArrayNode();
+        for (SeaTunnelRow row : rows) {
+            byte[] jsonBytes = SERIALIZATION_SCHEMA.serialize(row);
+            JsonNode jsonNode = SERIALIZATION_SCHEMA.getMapper().readTree(jsonBytes);
+            arrayNode.add(jsonNode);
+        }
+        return SERIALIZATION_SCHEMA.getMapper().writeValueAsString(arrayNode);
+    }
+}
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/http_source_to_sink.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/http_source_to_sink.conf
new file mode 100644
index 0000000..837884b
--- /dev/null
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/http_source_to_sink.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    url = "http://localhost:18888/read"
+    poll_interval_ms = 1000
+    retry = 1
+    retry_backoff_multiplier_ms = 100
+    retry_backoff_max_ms = 1000
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Http {
+    url = "http://localhost:18888/write"
+    retry = 1
+    retry_backoff_multiplier_ms = 100
+    retry_backoff_max_ms = 1000
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index 202f62b..5d6f199 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -104,8 +104,27 @@
         if (message == null) {
             return null;
         }
+        return convertJsonNode(convertBytes(message));
+    }
+
+    public SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
+        if (jsonNode == null) {
+            return null;
+        }
         try {
-            return (SeaTunnelRow) runtimeConverter.convert(objectMapper.readTree(message));
+            return (SeaTunnelRow) runtimeConverter.convert(jsonNode);
+        } catch (Throwable t) {
+            if (ignoreParseErrors) {
+                return null;
+            }
+            throw new IOException(
+                format("Failed to deserialize JSON '%s'.", jsonNode.asText()), t);
+        }
+    }
+
+    public JsonNode convertBytes(byte[] message) throws IOException {
+        try {
+            return objectMapper.readTree(message);
         } catch (Throwable t) {
             if (ignoreParseErrors) {
                 return null;
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 3510acf..6763b38 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -26,6 +26,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
 
 public class JsonSerializationSchema implements SerializationSchema {
 
@@ -38,6 +39,7 @@
     private transient ObjectNode node;
 
     /** Object mapper that is used to create output JSON objects. */
+    @Getter
     private final ObjectMapper mapper = new ObjectMapper();
 
     private final RowToJsonConverters.RowToJsonConverter runtimeConverter;
@@ -59,7 +61,7 @@
             return mapper.writeValueAsBytes(node);
         } catch (Throwable e) {
             throw new RuntimeException(
-                    String.format("Failed to deserialize JSON '%s'.", row), e);
+                String.format("Failed to deserialize JSON '%s'.", row), e);
         }
     }
 }
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 138b553..e0d6b09 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -20,17 +20,21 @@
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -41,6 +45,9 @@
 import java.time.temporal.TemporalAccessor;
 import java.time.temporal.TemporalQueries;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 /** Tool class used to convert from {@link JsonNode} to {@link org.apache.seatunnel.api.table.type.SeaTunnelRow}. * */
 public class JsonToRowConverters implements Serializable {
@@ -107,6 +114,10 @@
                 return this::convertToBytes;
             case DECIMAL:
                 return createDecimalConverter((BasicType<BigDecimal>) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                return createMapConverter((MapType) type);
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + type);
         }
@@ -202,7 +213,7 @@
         };
     }
 
-    public JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
+    private JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
         final JsonToRowConverter[] fieldConverters =
                 Arrays.stream(rowType.getFieldTypes())
                         .map(this::createConverter)
@@ -228,6 +239,33 @@
         };
     }
 
+    private JsonToRowConverter createArrayConverter(ArrayType arrayType) {
+        BasicType elementType = arrayType.getElementType();
+        JsonToRowConverter elementConverter = createConverter(elementType);
+        return jsonNode -> {
+            ArrayNode arrayNode = (ArrayNode) jsonNode;
+            Object[] arrayValue = (Object[]) Array.newInstance(elementType.getTypeClass(), arrayNode.size());
+            for (int i = 0; i < arrayNode.size(); i++) {
+                arrayValue[i] = elementConverter.convert(arrayNode.get(i));
+            }
+            return arrayValue;
+        };
+    }
+
+    private JsonToRowConverter createMapConverter(MapType mapType) {
+        JsonToRowConverter valueConverter = createConverter(mapType.getValueType());
+        return jsonNode -> {
+            ObjectNode objectNode = (ObjectNode) jsonNode;
+            Map<String, Object> mapValue = new HashMap<>();
+            for (Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
+                 iterator.hasNext();) {
+                Map.Entry<String, JsonNode> entry = iterator.next();
+                mapValue.put(entry.getKey(), valueConverter.convert(entry.getValue()));
+            }
+            return mapValue;
+        };
+    }
+
     private Object convertField(
         JsonToRowConverter fieldConverter, String fieldName, JsonNode field) {
         if (field == null) {