blob: 084547242c5e3904192521ce159c47667fc3ff6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.sls.sink;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.threadpool.ScheduleFactory;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.sls.source.utils.SLSUtil;
import org.apache.rocketmq.streams.sts.StsIdentity;
import org.apache.rocketmq.streams.sts.StsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 做为metaq的一个消息队列。每次增加一个队列只需要在数据库中增加一条Channel记录即可。 记录中的字端代表了metaq队列的参数 要求必须有无参数构造函数
*/
public class SLSSink extends AbstractSupportShuffleSink {
/**
* sls shard个数 默认8
*/
private static final int SHARD_COUNT = 8;
private static final long serialVersionUID = 5429201258366881915L;
private static final Logger LOGGER = LoggerFactory.getLogger(SLSSink.class);
private static final int MAX_LIMIT_COUNT = 4000;
private final transient LogHubConfig.ConsumePosition cursorPosition = LogHubConfig.ConsumePosition.END_CURSOR;
private final transient ReadWriteLock lock = new ReentrantReadWriteLock();
@ENVDependence public String project;
@ENVDependence public String logStore;
@ENVDependence protected String source;
@ENVDependence protected String topic;
protected String logTimeFieldName;
@ENVDependence private String endPoint;
@ENVDependence private String accessId;
@ENVDependence private String accessKey;
@ENVDependence private boolean isSts;
@ENVDependence private String stsRoleArn;
@ENVDependence private String stsAssumeRoleFor;
@ENVDependence private String stsSessionPrefix;
@ENVDependence private int stsExpireSeconds = 86400;
@ENVDependence private String ramEndpoint = "sts-inner.aliyuncs.com";
private transient StsService stsService;
private transient Client client = null;
// worker 向服务端汇报心跳的时间间隔,单位是毫秒,建议取值 10000ms。
private long heartBeatIntervalMillis = 10000;
// 是否按序消费
private boolean consumeInOrder = true;
private transient boolean isFinished = false; // 如果消息被销毁,会通过这个标记停止消息的消费
//sls数据保存周期,默认7天
private int ttl = 7;
private transient LogProducer logProducer;
public SLSSink() {
}
public SLSSink(String endPoint, String project, String logStore, String accessId, String accessKey) {
this.endPoint = endPoint;
this.project = project;
this.logStore = logStore;
this.accessId = accessId;
this.accessKey = accessKey;
}
@Override protected boolean initConfigurable() {
if (StringUtil.isEmpty(endPoint) || StringUtil.isEmpty(accessId) || StringUtil.isEmpty(accessKey) || StringUtil.isEmpty(logStore)) {
return false;
}
boolean b = initSts();
if (!b) {
return false;
}
try {
this.client = createClient();
} catch (ExecutionException e) {
e.printStackTrace();
}
if (this.logProducer == null) {
try {
ProjectConfig projectConfig = createProjectConfig();
this.logProducer = new LogProducer(new ProducerConfig());
this.logProducer.putProjectConfig(projectConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
this.startRefreshAsync();
return super.initConfigurable();
}
@Override public int getSplitNum() {
return getSplitList().size();
}
@Override public List<ISplit<?, ?>> getSplitList() {
lock.readLock().lock();
try {
List<Shard> shards = client.ListShard(project, logStore).GetShards();
List<ISplit<?, ?>> splits = new ArrayList<>();
for (Shard shard : shards) {
splits.add(new SLSSplit(shard));
}
return splits;
} catch (LogException ex) {
throw new RuntimeException("Error while listing shards", ex);
} finally {
lock.readLock().unlock();
}
}
private boolean initSts() {
if (isSts) {
if (StringUtil.isEmpty(stsRoleArn) || StringUtil.isEmpty(stsAssumeRoleFor)) {
return false;
}
stsService = new StsService();
stsService.setAccessId(accessId);
stsService.setAccessKey(accessKey);
stsService.setRamEndPoint(ramEndpoint);
stsService.setStsExpireSeconds(stsExpireSeconds);
stsService.setStsSessionPrefix(stsSessionPrefix);
stsService.setRoleArn(stsRoleArn);
stsService.setStsAssumeRoleFor(stsAssumeRoleFor);
}
return true;
}
protected void startRefreshAsync() {
if (isSts) {
Runnable stsRefreshTask = () -> {
Client c = null;
ProjectConfig projectConfig = null;
try {
c = createClient();
projectConfig = createProjectConfig();
} catch (ExecutionException e) {
e.printStackTrace();
}
lock.writeLock().lock();
client = c;
lock.writeLock().unlock();
logProducer.putProjectConfig(projectConfig);
};
ScheduleFactory.getInstance().execute(getNameSpace() + "-" + getName() + "-sink_sts_schedule", stsRefreshTask, stsService.getRefreshTimeSecond(), stsService.getRefreshTimeSecond(), TimeUnit.SECONDS);
}
}
@Override public String getShuffleTopicFieldName() {
return "logStore";
}
@Override public void destroy() {
super.destroy();
if (logProducer != null) {
try {
logProducer.close();
} catch (InterruptedException | ProducerException e) {
throw new RuntimeException("Sls sink close error", e);
} finally {
logProducer = null;
}
}
ScheduleFactory.getInstance().cancel(getNameSpace() + "-" + getName() + "-sink_sts_schedule");
}
@Override public boolean batchInsert(List<IMessage> messages) {
List<LogItem> logItems = new ArrayList<>();
for (IMessage message : messages) {
LogItem logItem = SLSUtil.createLogItem(message, logTimeFieldName);
logItems.add(logItem);
if (logItems.size() >= MAX_LIMIT_COUNT) {
putLogs(project, logStore, topic, source, logItems);
logItems = new ArrayList<>();
}
}
if (logItems.size() > 0) {
putLogs(project, logStore, topic, source, logItems);
}
return true;
}
@Override protected void createTopicIfNotExist(int splitNum) {
lock.readLock().lock();
try {
if (!SLSUtil.existProject(client, project)) {
SLSUtil.createProject(client, project);
}
if (!SLSUtil.existLogstore(client, project, logStore)) {
SLSUtil.createLogstore(client, project, logStore, ttl, splitNum > 0 ? splitNum : SHARD_COUNT);
}
} finally {
lock.readLock().unlock();
}
if (logProducer == null) {
ProjectConfig projectConfig = null;
try {
projectConfig = createProjectConfig();
} catch (ExecutionException e) {
e.printStackTrace();
}
logProducer = new LogProducer(new ProducerConfig());
logProducer.putProjectConfig(projectConfig);
}
}
protected ProjectConfig createProjectConfig() throws ExecutionException {
ProjectConfig projectConfig;
if (isSts) {
StsIdentity stsIdentity = stsService.getStsIdentity();
projectConfig = new ProjectConfig(project, endPoint, stsIdentity.getAccessKeyId(), stsIdentity.getAccessKeySecret(), stsIdentity.getSecurityToken());
} else {
projectConfig = new ProjectConfig(project, endPoint, accessId, accessKey);
}
return projectConfig;
}
protected Client createClient() throws ExecutionException {
Client client;
if (isSts) {
StsIdentity stsIdentity = stsService.getStsIdentity();
client = new Client(endPoint, stsIdentity.getAccessKeyId(), stsIdentity.getAccessKeySecret());
client.setSecurityToken(stsIdentity.getSecurityToken());
} else {
client = new Client(endPoint, accessId, accessKey);
}
return client;
}
private void putLogs(String project, String logStore, String topic, String source, List<LogItem> logItems) {
if (project == null || logStore == null || logItems == null) {
LOGGER.error("HandleLogTask PutLogs error, project, logStore, logItems may be null.");
return;
}
if (logItems.size() > 4096) {
LOGGER.error("HandleLogTask PutLogs error, list too much or list is 0: project:" + project + "logStore:" + logStore + "logItems size:" + logItems.size());
return;
}
if (logItems.size() == 0) {
return;
}
try {
if (logProducer != null) {
logProducer.send(getProject(), getLogStore(), topic, source, logItems, result -> {
if (!result.isSuccessful()) {
LOGGER.error(result.getErrorMessage());
}
});
} else {
LOGGER.warn("LogProducer_Destroyed_Cannot_Send_Message");
}
} catch (Exception e) {
LOGGER.error("send sls log error ", e);
}
}
public LogHubConfig.ConsumePosition getCursorPosition() {
return cursorPosition;
}
public String getEndPoint() {
return endPoint;
}
public void setEndPoint(String endPoint) {
this.endPoint = endPoint;
}
public String getProject() {
return project;
}
public void setProject(String project) {
this.project = project;
}
public String getLogStore() {
return logStore;
}
public void setLogStore(String logStore) {
this.logStore = logStore;
}
public String getAccessId() {
return accessId;
}
public void setAccessId(String accessId) {
this.accessId = accessId;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public long getHeartBeatIntervalMillis() {
return heartBeatIntervalMillis;
}
public void setHeartBeatIntervalMillis(long heartBeatIntervalMillis) {
this.heartBeatIntervalMillis = heartBeatIntervalMillis;
}
public boolean isConsumeInOrder() {
return consumeInOrder;
}
public void setConsumeInOrder(boolean consumeInOrder) {
this.consumeInOrder = consumeInOrder;
}
@Override public boolean isFinished() {
return isFinished;
}
public void setFinished(boolean finished) {
isFinished = finished;
}
public String getLogTimeFieldName() {
return logTimeFieldName;
}
public void setLogTimeFieldName(String logTimeFieldName) {
this.logTimeFieldName = logTimeFieldName;
}
public int getTtl() {
return ttl;
}
public void setTtl(int ttl) {
this.ttl = ttl;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public boolean isSts() {
return isSts;
}
public void setSts(boolean sts) {
isSts = sts;
}
public String getStsRoleArn() {
return stsRoleArn;
}
public void setStsRoleArn(String stsRoleArn) {
this.stsRoleArn = stsRoleArn;
}
public String getStsAssumeRoleFor() {
return stsAssumeRoleFor;
}
public void setStsAssumeRoleFor(String stsAssumeRoleFor) {
this.stsAssumeRoleFor = stsAssumeRoleFor;
}
public String getStsSessionPrefix() {
return stsSessionPrefix;
}
public void setStsSessionPrefix(String stsSessionPrefix) {
this.stsSessionPrefix = stsSessionPrefix;
}
public int getStsExpireSeconds() {
return stsExpireSeconds;
}
public void setStsExpireSeconds(int stsExpireSeconds) {
this.stsExpireSeconds = stsExpireSeconds;
}
public String getRamEndpoint() {
return ramEndpoint;
}
public void setRamEndpoint(String ramEndpoint) {
this.ramEndpoint = ramEndpoint;
}
public StsService getStsService() {
return stsService;
}
public void setStsService(StsService stsService) {
this.stsService = stsService;
}
}