[ISSUE #4602] when wechat send message api response errcode is not zero, the wechat sink connector does not throw IllegalAccessException (#4627)
* fix issue#4602
* change README.md file
diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md
index 25abaef..1fc836f 100644
--- a/eventmesh-connectors/README.md
+++ b/eventmesh-connectors/README.md
@@ -66,4 +66,6 @@
| [Spring](eventmesh-connector-spring) | Sink | ✅ |
| WeCom | Source | ⬜ |
| [WeCom](eventmesh-connector-wecom) | Sink | ✅ |
+| WeChat | Source | ⬜ |
+| [WeChat](eventmesh-connector-wechat) | Sink | ✅ |
| More connectors will be added... | Source/Sink | N/A |
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
index 3d51704..231b01c 100644
--- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
+++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
@@ -22,7 +22,7 @@
@Data
public class TemplateMessageResponse {
- private int errocode;
+ private int errcode;
private String errmsg;
diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
index 0457202..1d1c325 100644
--- a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
@@ -166,9 +166,9 @@
throw new IOException("message response is null.");
}
- if (messageResponse.getErrocode() != 0) {
- throw new IllegalAccessException(String.format("Send message to weCom error! errorCode=%s, errorMessage=%s",
- messageResponse.getErrocode(), messageResponse.getErrmsg()));
+ if (messageResponse.getErrcode() != 0) {
+ throw new IllegalAccessException(String.format("Send message to WeChat error! errorCode=%s, errorMessage=%s",
+ messageResponse.getErrcode(), messageResponse.getErrmsg()));
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
index 2c4ad5b..5920417 100644
--- a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
@@ -29,11 +29,14 @@
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -79,6 +82,20 @@
Mockito.doReturn(tokenCall).when(okHttpClient).newCall(Mockito.argThat(tokenMatcher));
Mockito.doReturn(tokenResponse).when(tokenCall).execute();
+ weChatSinkConnector = new WeChatSinkConnector();
+ WeChatSinkConfig weChatSinkConfig = (WeChatSinkConfig) ConfigUtil.parse(weChatSinkConnector.configClass());
+ weChatSinkConnector.init(weChatSinkConfig);
+ Field clientField = ReflectionSupport.findFields(weChatSinkConnector.getClass(),
+ (f) -> f.getName().equals("okHttpClient"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ clientField.setAccessible(true);
+ clientField.set(weChatSinkConnector, okHttpClient);
+ weChatSinkConnector.start();
+ }
+
+ @Test
+ public void testSendMessageToWeChat() throws Exception {
+
Request sendMessageRequest = new Request.Builder().url("https://api.weixin.qq.com/cgi-bin/message/template/send").build();
String sendMessageResponseJson = "{\"errcode\":0,\"errmsg\":\"ok\",\"msgid\":200228332}";
ResponseBody sendMessageBody = ResponseBody.create(MediaType.parse("application/json; charset=utf-8"), sendMessageResponseJson);
@@ -95,31 +112,44 @@
Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher));
Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute();
- weChatSinkConnector = new WeChatSinkConnector();
- WeChatSinkConfig weChatSinkConfig = (WeChatSinkConfig) ConfigUtil.parse(weChatSinkConnector.configClass());
- weChatSinkConnector.init(weChatSinkConfig);
- Field clientField = ReflectionSupport.findFields(weChatSinkConnector.getClass(),
- (f) -> f.getName().equals("okHttpClient"),
- HierarchyTraversalMode.BOTTOM_UP).get(0);
- clientField.setAccessible(true);
- clientField.set(weChatSinkConnector, okHttpClient);
- weChatSinkConnector.start();
+ List<ConnectRecord> records = new ArrayList<>();
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8));
+ records.add(connectRecord);
+
+ weChatSinkConnector.put(records);
+ verify(okHttpClient, times(2)).newCall(any(Request.class));
+
+ WeChatSinkConnector.ACCESS_TOKEN_CACHE.invalidate(WeChatSinkConnector.ACCESS_TOKEN_CACHE_KEY);
}
@Test
- public void testSendMessageToWeChat() throws Exception {
- final int times = 1;
- List<ConnectRecord> records = new ArrayList<>();
- for (int i = 0; i < times; i++) {
- RecordPartition partition = new RecordPartition();
- RecordOffset offset = new RecordOffset();
- ConnectRecord connectRecord = new ConnectRecord(partition, offset,
- System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8));
- records.add(connectRecord);
- }
+ public void testSendMessageToWeChatAbnormally() throws Exception {
+ Request sendMessageRequest = new Request.Builder().url("https://api.weixin.qq.com/cgi-bin/message/template/send").build();
+ String sendMessageResponseJson = "{\"errcode\":42001,\"errmsg\":\"access_token expired rid: 656e8793-061949b5-738cb8f4\"}";
+ ResponseBody sendMessageBody = ResponseBody.create(MediaType.parse("application/json; charset=utf-8"), sendMessageResponseJson);
+ Response sendMessageResponse = new Response.Builder()
+ .code(200)
+ .protocol(Protocol.HTTP_1_0)
+ .request(sendMessageRequest)
+ .body(sendMessageBody)
+ .message("ok")
+ .build();
+ ArgumentMatcher<Request> sendMessageMatcher = (anyRequest) ->
+ sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
+ Call sendMessageRequestCall = Mockito.mock(Call.class);
+ Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher));
+ Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute();
- weChatSinkConnector.put(records);
- verify(okHttpClient, times(times + 1)).newCall(any(Request.class));
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(), "Hello, EventMesh!".getBytes(StandardCharsets.UTF_8));
+ Method sendMessageMethod = WeChatSinkConnector.class.getDeclaredMethod("sendMessage", ConnectRecord.class);
+ sendMessageMethod.setAccessible(true);
+ Assertions.assertThrows(InvocationTargetException.class, () -> sendMessageMethod.invoke(weChatSinkConnector, connectRecord));
}
@AfterEach