blob: 44ecab01388cd87b2d6ceac97faf954772db5488 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.util.HashMap;
public class WeChatHttpSinkWriter extends HttpSinkWriter {
private final WeChatSinkConfig weChatSinkConfig;
private final SeaTunnelRowType seaTunnelRowType;
public WeChatHttpSinkWriter(HttpParameter httpParameter, Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
//new SeaTunnelRowType can match SeaTunnelRowWrapper fields sequence
super(new SeaTunnelRowType(new String[]{WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY, WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE}, new SeaTunnelDataType[]{BasicType.VOID_TYPE, BasicType.VOID_TYPE}), httpParameter);
this.weChatSinkConfig = new WeChatSinkConfig(pluginConfig);
this.seaTunnelRowType = seaTunnelRowType;
}
@Override
public void write(SeaTunnelRow element) throws IOException {
StringBuffer stringBuffer = new StringBuffer();
int totalFields = seaTunnelRowType.getTotalFields();
for (int i = 0; i < totalFields; i++) {
stringBuffer.append(seaTunnelRowType.getFieldName(i) + ": " + element.getField(i) + "\\n");
}
if (totalFields > 0) {
//remove last empty line
stringBuffer.delete(stringBuffer.length() - 2, stringBuffer.length());
}
HashMap<Object, Object> objectMap = new HashMap<>();
objectMap.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY, stringBuffer.toString());
if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
objectMap.put(WeChatSinkConfig.MENTIONED_LIST, weChatSinkConfig.getMentionedList());
}
if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
objectMap.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST, weChatSinkConfig.getMentionedMobileList());
}
//SeaTunnelRowWrapper can used to post wechat web hook
SeaTunnelRow wechatRowWrapper = new SeaTunnelRow(new Object[]{WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, objectMap});
super.write(wechatRowWrapper);
}
}