blob: 9b6d2b7d3dd3b4eab42af97803d0ff8200f142f2 [file] [log] [blame]
package org.apache.rocketmq.connect.http;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.internal.DefaultKeyValue;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.connect.http.auth.AbstractHttpClient;
import org.apache.rocketmq.connect.http.auth.HttpCallback;
import org.apache.rocketmq.connect.http.constant.AuthTypeEnum;
import org.apache.rocketmq.connect.http.constant.HttpConstant;
import org.apache.rocketmq.connect.http.entity.HttpRequest;
import org.apache.rocketmq.connect.http.entity.TokenEntity;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@RunWith(MockitoJUnitRunner.class)
public class HttpSinkTaskTest {
@Mock
AbstractHttpClient httpClient;
@Before
public void setUp() throws Exception {
}
@Test
public void testHttpPutWithoutHeaderAndQueryAndAuth() throws Exception {
doAnswer(invocationOnMock -> {
HttpRequest httpRequest = invocationOnMock.getArgument(0);
Assert.assertEquals("http://localhost:8080/api", httpRequest.getUrl());
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("{token=token}", httpRequest.getHeaderMap().toString());
Assert.assertEquals("123", httpRequest.getBody());
HttpCallback httpCallback = invocationOnMock.getArgument(1);
httpCallback.getCountDownLatch()
.countDown();
return null;
}).when(httpClient).executeNotReturn(any(), any());
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL, "http://localhost:8080/api");
keyValue.put(HttpConstant.METHOD, "POST");
keyValue.put(HttpConstant.TIMEOUT, 60000);
Map<String, String> bodyMap = Maps.newHashMap();
bodyMap.put("form", "CONSTANT");
bodyMap.put("value", "123");
keyValue.put(HttpConstant.BODY, "123");
keyValue.put(HttpConstant.TOKEN, "token");
httpSinkTask.validate(keyValue);
List<ConnectRecord> connectRecordList = Lists.newArrayList();
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
connectRecord.setData(JSONObject.toJSONString(new HashMap<>()));
connectRecordList.add(connectRecord);
httpSinkTask.init(new SinkTaskContext() {
@Override
public String getConnectorName() {
return null;
}
@Override
public String getTaskName() {
return null;
}
@Override public KeyValue configs() {
return null;
}
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
@Override
public void pause(List<RecordPartition> partitions) {
}
@Override
public void resume(List<RecordPartition> partitions) {
}
@Override
public Set<RecordPartition> assignment() {
return null;
}
});
httpSinkTask.start(keyValue);
Field field = httpSinkTask.getClass().getDeclaredField("httpClient");
field.setAccessible(true);
field.set(httpSinkTask, httpClient);
httpSinkTask.put(connectRecordList);
httpSinkTask.stop();
}
@Test
public void testHttpPutWithHeaderAndQueryWithoutAuth() throws Exception {
doAnswer(invocationOnMock -> {
HttpRequest httpRequest = invocationOnMock.getArgument(0);
System.out.println(new Gson().toJson(httpRequest));
Assert.assertEquals("http://localhost:8080/api?action=action&accountId=accountId",
httpRequest.getUrl());
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("{meetingName=swqd, id=45ef4dewdwe1-7c35-447a-bd93-fab****, userId=199525, groupId=456, token=token}", httpRequest.getHeaderMap().toString());
Assert.assertEquals("{\"form\":\"CONSTANT\",\"value\":\"123\"}", httpRequest.getBody());
HttpCallback httpCallback = invocationOnMock.getArgument(1);
httpCallback.getCountDownLatch()
.countDown();
return null;
}).when(httpClient).executeNotReturn(any(), any());
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL, "http://localhost:8080/api");
keyValue.put(HttpConstant.METHOD, "POST");
keyValue.put(HttpConstant.TIMEOUT, 60000);
keyValue.put(HttpConstant.SOCKS5_ENDPOINT, "127.0.0.1");
keyValue.put(HttpConstant.SOCKS5_USERNAME, "xxxx");
keyValue.put(HttpConstant.SOCKS5_PASSWORD, "xxxx");
keyValue.put(HttpConstant.HEADER_PARAMETERS, "{\n" +
" \"meetingName\": \"swqd\",\n" +
" \"groupId\": \"456\"\n" +
"}");
keyValue.put(HttpConstant.FIXED_HEADER_PARAMETERS, "{\n" +
" \"id\": \"45ef4dewdwe1-7c35-447a-bd93-fab****\",\n" +
" \"userId\": \"199525\"\n" +
"}");
keyValue.put(HttpConstant.QUERY_PARAMETERS, "{\n" +
" \"action\": \"action\"" +
"}");
keyValue.put(HttpConstant.FIXED_QUERY_PARAMETERS, "{\n" +
" \"accountId\": \"accountId\"" +
"}");
keyValue.put(HttpConstant.TOKEN, "token");
Map<String, String> bodyMap = Maps.newHashMap();
bodyMap.put("form", "CONSTANT");
bodyMap.put("value", "123");
keyValue.put(HttpConstant.BODY, JSONObject.toJSONString(bodyMap));
httpSinkTask.validate(keyValue);
List<ConnectRecord> connectRecordList = Lists.newArrayList();
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
String cloudEvent = "{\n" +
" \"data\":{\n" +
" \"meetingName\":\"swqd\",\n" +
" \"groupId\":\"456\",\n" +
" \"action\":\"camera_off\",\n" +
" \"time\":1590592527490,\n" +
" \"userId\":\"199525\",\n" +
" \"meetingUUID\":\"hz-20864c8f-b10d-45cd-9935-884bca1b****\"\n" +
" },\n" +
" \"id\":\"45ef4dewdwe1-7c35-447a-bd93-fab****\",\n" +
" \"source\":\"acs:aliyuncvc\",\n" +
" \"specversion\":\"1.0\",\n" +
" \"subject\":\"acs.aliyuncvc:cn-hangzhou:{AccountId}:215672\",\n" +
" \"time\":\"2020-11-19T21:04:41+08:00\",\n" +
" \"type\":\"aliyuncvc:MeetingEvent:MemberOperate\",\n" +
" \"aliyunaccountid\":\"123456789098****\",\n" +
" \"aliyunpublishtime\":\"2020-11-19T21:04:42.179PRC\",\n" +
" \"aliyuneventbusname\":\"default\",\n" +
" \"aliyunregionid\":\"cn-hangzhou\",\n" +
" \"aliyunpublishaddr\":\"172.25.XX.XX\"\n" +
"}";
connectRecord.setData(cloudEvent);
connectRecordList.add(connectRecord);
httpSinkTask.init(new SinkTaskContext() {
@Override
public String getConnectorName() {
return null;
}
@Override
public String getTaskName() {
return null;
}
@Override public KeyValue configs() {
return null;
}
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
@Override
public void pause(List<RecordPartition> partitions) {
}
@Override
public void resume(List<RecordPartition> partitions) {
}
@Override
public Set<RecordPartition> assignment() {
return null;
}
});
httpSinkTask.start(keyValue);
Field field = httpSinkTask.getClass().getDeclaredField("httpClient");
field.setAccessible(true);
field.set(httpSinkTask, httpClient);
httpSinkTask.put(connectRecordList);
httpSinkTask.stop();
}
@Test
public void testHttpPutWithHeaderAndQueryWithBasicAuth() throws Exception {
doAnswer(invocationOnMock -> {
HttpRequest httpRequest = invocationOnMock.getArgument(0);
System.out.println(new Gson().toJson(httpRequest));
Assert.assertEquals("http://localhost:8080/api?action=action&accountId=accountId",
httpRequest.getUrl());
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("{Authorization=Basic dXNlcm5hbWU6cGFzc3dvcmQ=, meetingName=swqd, id=45ef4dewdwe1-7c35-447a-bd93-fab****, userId=199525, groupId=456, token=token}", httpRequest.getHeaderMap().toString());
Assert.assertEquals("{\"form\":\"CONSTANT\",\"value\":\"123\"}", httpRequest.getBody());
HttpCallback httpCallback = invocationOnMock.getArgument(1);
httpCallback.getCountDownLatch()
.countDown();
return null;
}).when(httpClient).executeNotReturn(any(), any());
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL, "http://localhost:8080/api");
keyValue.put(HttpConstant.METHOD, "POST");
keyValue.put(HttpConstant.TIMEOUT, 60000);
keyValue.put(HttpConstant.SOCKS5_ENDPOINT, "127.0.0.1");
keyValue.put(HttpConstant.SOCKS5_USERNAME, "xxxx");
keyValue.put(HttpConstant.SOCKS5_PASSWORD, "xxxx");
keyValue.put(HttpConstant.HEADER_PARAMETERS, "{\n" +
" \"meetingName\": \"swqd\",\n" +
" \"groupId\": \"456\"\n" +
"}");
keyValue.put(HttpConstant.FIXED_HEADER_PARAMETERS, "{\n" +
" \"id\": \"45ef4dewdwe1-7c35-447a-bd93-fab****\",\n" +
" \"userId\": \"199525\"\n" +
"}");
keyValue.put(HttpConstant.QUERY_PARAMETERS, "{\n" +
" \"action\": \"action\"" +
"}");
keyValue.put(HttpConstant.FIXED_QUERY_PARAMETERS, "{\n" +
" \"accountId\": \"accountId\"" +
"}");
keyValue.put(HttpConstant.TOKEN, "token");
keyValue.put(HttpConstant.AUTH_TYPE, "BASIC_AUTH");
keyValue.put(HttpConstant.BASIC_USERNAME, "username");
keyValue.put(HttpConstant.BASIC_PASSWORD, "password");
Map<String, String> bodyMap = Maps.newHashMap();
bodyMap.put("form", "CONSTANT");
bodyMap.put("value", "123");
keyValue.put(HttpConstant.BODY, JSONObject.toJSONString(bodyMap));
httpSinkTask.validate(keyValue);
List<ConnectRecord> connectRecordList = Lists.newArrayList();
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
String cloudEvent = "{\n" +
" \"data\":{\n" +
" \"meetingName\":\"swqd\",\n" +
" \"groupId\":\"456\",\n" +
" \"action\":\"camera_off\",\n" +
" \"time\":1590592527490,\n" +
" \"userId\":\"199525\",\n" +
" \"meetingUUID\":\"hz-20864c8f-b10d-45cd-9935-884bca1b****\"\n" +
" },\n" +
" \"id\":\"45ef4dewdwe1-7c35-447a-bd93-fab****\",\n" +
" \"source\":\"acs:aliyuncvc\",\n" +
" \"specversion\":\"1.0\",\n" +
" \"subject\":\"acs.aliyuncvc:cn-hangzhou:{AccountId}:215672\",\n" +
" \"time\":\"2020-11-19T21:04:41+08:00\",\n" +
" \"type\":\"aliyuncvc:MeetingEvent:MemberOperate\",\n" +
" \"aliyunaccountid\":\"123456789098****\",\n" +
" \"aliyunpublishtime\":\"2020-11-19T21:04:42.179PRC\",\n" +
" \"aliyuneventbusname\":\"default\",\n" +
" \"aliyunregionid\":\"cn-hangzhou\",\n" +
" \"aliyunpublishaddr\":\"172.25.XX.XX\"\n" +
"}";
connectRecord.setData(cloudEvent);
connectRecordList.add(connectRecord);
httpSinkTask.init(new SinkTaskContext() {
@Override
public String getConnectorName() {
return null;
}
@Override
public String getTaskName() {
return null;
}
@Override public KeyValue configs() {
return null;
}
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
@Override
public void pause(List<RecordPartition> partitions) {
}
@Override
public void resume(List<RecordPartition> partitions) {
}
@Override
public Set<RecordPartition> assignment() {
return null;
}
});
httpSinkTask.start(keyValue);
Field field = httpSinkTask.getClass().getDeclaredField("httpClient");
field.setAccessible(true);
field.set(httpSinkTask, httpClient);
httpSinkTask.put(connectRecordList);
httpSinkTask.stop();
}
@Test
public void testHttpPutWithHeaderAndQueryWithApiKey() throws Exception {
doAnswer(invocationOnMock -> {
HttpRequest httpRequest = invocationOnMock.getArgument(0);
System.out.println(new Gson().toJson(httpRequest));
Assert.assertEquals("http://localhost:8080/api?action=action&accountId=accountId",
httpRequest.getUrl());
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("{meetingName=swqd, id=45ef4dewdwe1-7c35-447a-bd93-fab****, userId=199525, groupId=456, token=token, username=password}", httpRequest.getHeaderMap().toString());
Assert.assertEquals("\"data\"", httpRequest.getBody());
HttpCallback httpCallback = invocationOnMock.getArgument(1);
httpCallback.getCountDownLatch()
.countDown();
return null;
}).when(httpClient).executeNotReturn(any(), any());
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL, "http://localhost:8080/api");
keyValue.put(HttpConstant.METHOD, "POST");
keyValue.put(HttpConstant.TIMEOUT, 60000);
keyValue.put(HttpConstant.SOCKS5_ENDPOINT, "127.0.0.1");
keyValue.put(HttpConstant.SOCKS5_USERNAME, "xxxx");
keyValue.put(HttpConstant.SOCKS5_PASSWORD, "xxxx");
keyValue.put(HttpConstant.HEADER_PARAMETERS, "{\n" +
" \"meetingName\": \"swqd\",\n" +
" \"groupId\": \"456\"\n" +
"}");
keyValue.put(HttpConstant.FIXED_HEADER_PARAMETERS, "{\n" +
" \"id\": \"45ef4dewdwe1-7c35-447a-bd93-fab****\",\n" +
" \"userId\": \"199525\"\n" +
"}");
keyValue.put(HttpConstant.QUERY_PARAMETERS, "{\n" +
" \"action\": \"action\"" +
"}");
keyValue.put(HttpConstant.FIXED_QUERY_PARAMETERS, "{\n" +
" \"accountId\": \"accountId\"" +
"}");
keyValue.put(HttpConstant.TOKEN, "token");
keyValue.put(HttpConstant.AUTH_TYPE, AuthTypeEnum.API_KEY.getAuthType());
keyValue.put(HttpConstant.API_KEY_USERNAME, "username");
keyValue.put(HttpConstant.API_KEY_PASSWORD, "password");
httpSinkTask.validate(keyValue);
List<ConnectRecord> connectRecordList = Lists.newArrayList();
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
connectRecord.setData("data");
connectRecordList.add(connectRecord);
httpSinkTask.init(new SinkTaskContext() {
@Override
public String getConnectorName() {
return null;
}
@Override
public String getTaskName() {
return null;
}
@Override public KeyValue configs() {
return null;
}
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
@Override
public void pause(List<RecordPartition> partitions) {
}
@Override
public void resume(List<RecordPartition> partitions) {
}
@Override
public Set<RecordPartition> assignment() {
return null;
}
});
httpSinkTask.start(keyValue);
Field field = httpSinkTask.getClass().getDeclaredField("httpClient");
field.setAccessible(true);
field.set(httpSinkTask, httpClient);
httpSinkTask.put(connectRecordList);
httpSinkTask.stop();
}
@Test
public void testHttpPutWithHeaderAndQueryWithOAuth() throws Exception {
doAnswer(invocationOnMock -> {
HttpRequest httpRequest = invocationOnMock.getArgument(0);
System.out.println(new Gson().toJson(httpRequest));
Assert.assertEquals("http://localhost:8080/api?action=action&accountId=accountId",
httpRequest.getUrl());
Assert.assertEquals("POST", httpRequest.getMethod());
Assert.assertEquals("{Authorization=Bearer oAuthAccessToken, meetingName=swqd, id=45ef4dewdwe1-7c35-447a-bd93-fab****, userId=199525, groupId=456, token=token}", httpRequest.getHeaderMap().toString());
Assert.assertEquals("\"data\"", httpRequest.getBody());
HttpCallback httpCallback = invocationOnMock.getArgument(1);
httpCallback.getCountDownLatch()
.countDown();
return null;
}).when(httpClient).executeNotReturn(any(), any());
// for OAuth request
doAnswer(invocationOnMock -> {
HttpRequest httpRequest = invocationOnMock.getArgument(0);
Assert.assertEquals("{\"url\":\"oAuthEndpoint?headerKey1\\u003dheaderValue1\\u0026client_secret\\u003doAuthClientSecret\\u0026client_id\\u003doAuthClientId\",\"method\":\"POST\",\"headerMap\":{\"headerKey1\":\"headerValue1\"},\"body\":\"oAuthBody\",\"timeout\":\"3000\"}", new Gson().toJson(httpRequest));
TokenEntity tokenEntity = new TokenEntity();
tokenEntity.setAccessToken("oAuthAccessToken");
tokenEntity.setTokenType("tokenType");
tokenEntity.setExpiresIn(100);
tokenEntity.setError("error");
tokenEntity.setMessage("message");
tokenEntity.setPath("path");
tokenEntity.setScope("scope");
tokenEntity.setTokenTimestamp("tokenTimestamp");
tokenEntity.setTimestamp("timestamp");
tokenEntity.setExampleParameter("exampleParameter");
tokenEntity.setStatus("status");
return JSONObject.toJSONString(tokenEntity);
}).when(httpClient).execute(any(), any());
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL, "http://localhost:8080/api");
keyValue.put(HttpConstant.METHOD, "POST");
keyValue.put(HttpConstant.TIMEOUT, 60000);
keyValue.put(HttpConstant.SOCKS5_ENDPOINT, "127.0.0.1");
keyValue.put(HttpConstant.SOCKS5_USERNAME, "xxxx");
keyValue.put(HttpConstant.SOCKS5_PASSWORD, "xxxx");
keyValue.put(HttpConstant.HEADER_PARAMETERS, "{\n" +
" \"meetingName\": \"swqd\",\n" +
" \"groupId\": \"456\"\n" +
"}");
keyValue.put(HttpConstant.FIXED_HEADER_PARAMETERS, "{\n" +
" \"id\": \"45ef4dewdwe1-7c35-447a-bd93-fab****\",\n" +
" \"userId\": \"199525\"\n" +
"}");
keyValue.put(HttpConstant.QUERY_PARAMETERS, "{\n" +
" \"action\": \"action\"" +
"}");
keyValue.put(HttpConstant.FIXED_QUERY_PARAMETERS, "{\n" +
" \"accountId\": \"accountId\"" +
"}");
keyValue.put(HttpConstant.TOKEN, "token");
keyValue.put(HttpConstant.AUTH_TYPE, "OAUTH_AUTH");
String oAuthEndpoint = "oAuthEndpoint";
String oAuthHttpMethod = "POST";
String oAuthClientId = "oAuthClientId";
String oAuthClientSecret = "oAuthClientSecret";
String oAuthHeaderParameters = "{\n" +
" \"headerKey1\": \"headerValue1\"" +
"}";
String oAuthQueryParameters = "{\n" +
" \"queryKey1\": \"queryValue1\"" +
"}";
String oAuthBody = "oAuthBody";
keyValue.put(HttpConstant.OAUTH_ENDPOINT, oAuthEndpoint);
keyValue.put(HttpConstant.OAUTH_HTTP_METHOD, oAuthHttpMethod);
keyValue.put(HttpConstant.OAUTH_CLIENT_ID, oAuthClientId);
keyValue.put(HttpConstant.OAUTH_CLIENT_SECRET, oAuthClientSecret);
keyValue.put(HttpConstant.OAUTH_HEADER_PARAMETERS, oAuthHeaderParameters);
keyValue.put(HttpConstant.OAUTH_QUERY_PARAMETERS, oAuthQueryParameters);
keyValue.put(HttpConstant.OAUTH_BODY, oAuthBody);
httpSinkTask.validate(keyValue);
List<ConnectRecord> connectRecordList = Lists.newArrayList();
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
connectRecord.setData("data");
connectRecordList.add(connectRecord);
httpSinkTask.init(new SinkTaskContext() {
@Override
public String getConnectorName() {
return null;
}
@Override
public String getTaskName() {
return null;
}
@Override public KeyValue configs() {
return null;
}
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
@Override
public void pause(List<RecordPartition> partitions) {
}
@Override
public void resume(List<RecordPartition> partitions) {
}
@Override
public Set<RecordPartition> assignment() {
return null;
}
});
httpSinkTask.start(keyValue);
Field field = httpSinkTask.getClass().getDeclaredField("httpClient");
field.setAccessible(true);
field.set(httpSinkTask, httpClient);
httpSinkTask.updateAuth(AuthTypeEnum.OAUTH_CLIENT_CREDENTIALS.getAuthType(), null, null, null, null, oAuthEndpoint, oAuthHttpMethod, oAuthClientId, oAuthClientSecret, oAuthHeaderParameters, oAuthHeaderParameters, oAuthBody);
httpSinkTask.put(connectRecordList);
httpSinkTask.stop();
}
@Test
@Ignore
// use for invoke local service
public void test() throws Exception {
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
// test basic
keyValue.put(HttpConstant.URL, "http://localhost:8080");
keyValue.put(HttpConstant.METHOD, "GET");
keyValue.put(HttpConstant.TIMEOUT, 60000);
keyValue.put(HttpConstant.FIXED_HEADER_PARAMETERS, "{\n" +
" \"Content-Type\": \"application/json\",\n" +
" \"user\": \"user\"" +
"}");
httpSinkTask.validate(keyValue);
List<ConnectRecord> connectRecordList = Lists.newArrayList();
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
connectRecord.setData("data");
connectRecordList.add(connectRecord);
httpSinkTask.init(new SinkTaskContext() {
@Override
public String getConnectorName() {
return null;
}
@Override
public String getTaskName() {
return null;
}
@Override public KeyValue configs() {
return null;
}
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
}
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
}
@Override
public void pause(List<RecordPartition> partitions) {
}
@Override
public void resume(List<RecordPartition> partitions) {
}
@Override
public Set<RecordPartition> assignment() {
return null;
}
});
httpSinkTask.start(keyValue);
httpSinkTask.put(connectRecordList);
httpSinkTask.stop();
}
}