| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.rocketmq.connect.runtime.connectorwrapper; |
| |
| import com.alibaba.fastjson.JSON; |
| import io.openmessaging.KeyValue; |
| import io.openmessaging.connector.api.component.task.source.SourceTask; |
| import io.openmessaging.connector.api.data.ConnectRecord; |
| import io.openmessaging.connector.api.data.RecordConverter; |
| import io.openmessaging.connector.api.data.RecordPartition; |
| import io.openmessaging.connector.api.data.RecordPosition; |
| import io.openmessaging.connector.api.errors.ConnectException; |
| import io.openmessaging.connector.api.errors.RetriableException; |
| import io.openmessaging.connector.api.storage.OffsetStorageReader; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.rocketmq.client.exception.MQClientException; |
| import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| import org.apache.rocketmq.client.producer.SendCallback; |
| import org.apache.rocketmq.client.producer.SendResult; |
| import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash; |
| import org.apache.rocketmq.common.TopicConfig; |
| import org.apache.rocketmq.common.message.Message; |
| import org.apache.rocketmq.common.message.MessageAccessor; |
| import org.apache.rocketmq.common.message.MessageConst; |
| import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; |
| import org.apache.rocketmq.connect.runtime.common.LoggerName; |
| import org.apache.rocketmq.connect.runtime.config.ConnectConfig; |
| import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine; |
| import org.apache.rocketmq.connect.runtime.errors.ErrorReporter; |
| import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator; |
| import org.apache.rocketmq.connect.runtime.errors.ToleranceType; |
| import org.apache.rocketmq.connect.runtime.service.PositionManagementService; |
| import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager; |
| import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService; |
| import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl; |
| import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter; |
| import org.apache.rocketmq.connect.runtime.utils.Base64Util; |
| import org.apache.rocketmq.connect.runtime.utils.ConnectUtil; |
| import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId; |
| import org.apache.rocketmq.connect.runtime.utils.Utils; |
| import org.apache.rocketmq.remoting.exception.RemotingException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC; |
| |
| /** |
| * A wrapper of {@link SourceTask} for runtime. |
| */ |
| public class WorkerSourceTask extends WorkerTask { |
| |
| private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME); |
| private static final long SEND_FAILED_BACKOFF_MS = 100; |
| /** |
| * The implements of the source task. |
| */ |
| private final SourceTask sourceTask; |
| |
| protected final WorkerSourceTaskContext sourceTaskContext; |
| |
| /** |
| * Used to read the position of source data source. |
| */ |
| private final OffsetStorageReader offsetStorageReader; |
| |
| /** |
| * Used to write the position of source data source. |
| */ |
| private final PositionStorageWriter positionStorageWriter; |
| |
| /** |
| * A RocketMQ producer to send message to dest MQ. |
| */ |
| private DefaultMQProducer producer; |
| |
| /** |
| * A converter to parse source data entry to byte[]. |
| */ |
| private final RecordConverter keyConverter; |
| private final RecordConverter valueConverter; |
| |
| /** |
| * stat connect |
| */ |
| private final ConnectStatsManager connectStatsManager; |
| private final ConnectStatsService connectStatsService; |
| |
| private final CountDownLatch stopRequestedLatch; |
| private final AtomicReference<Throwable> producerSendException; |
| private List<ConnectRecord> toSendRecord; |
| |
| private volatile RecordOffsetManagement.CommittableOffsets committableOffsets; |
| private final RecordOffsetManagement offsetManagement; |
| /** |
| * The property of message in WHITE_KEY_SET don't need add a connect prefix |
| */ |
| private static final Set<String> WHITE_KEY_SET = new HashSet<>(); |
| |
| static { |
| WHITE_KEY_SET.add(MessageConst.PROPERTY_KEYS); |
| WHITE_KEY_SET.add(MessageConst.PROPERTY_TAGS); |
| } |
| |
| public WorkerSourceTask(ConnectConfig workerConfig, |
| ConnectorTaskId id, |
| SourceTask sourceTask, |
| ClassLoader classLoader, |
| ConnectKeyValue taskConfig, |
| PositionManagementService positionManagementService, |
| RecordConverter keyConverter, |
| RecordConverter valueConverter, |
| DefaultMQProducer producer, |
| AtomicReference<WorkerState> workerState, |
| ConnectStatsManager connectStatsManager, |
| ConnectStatsService connectStatsService, |
| TransformChain<ConnectRecord> transformChain, |
| RetryWithToleranceOperator retryWithToleranceOperator) { |
| super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState); |
| |
| this.sourceTask = sourceTask; |
| this.offsetStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService); |
| this.positionStorageWriter = new PositionStorageWriter(id.connector(), positionManagementService); |
| this.producer = producer; |
| this.valueConverter = valueConverter; |
| this.keyConverter = keyConverter; |
| this.connectStatsManager = connectStatsManager; |
| this.connectStatsService = connectStatsService; |
| this.sourceTaskContext = new WorkerSourceTaskContext(offsetStorageReader, this, taskConfig); |
| this.stopRequestedLatch = new CountDownLatch(1); |
| this.producerSendException = new AtomicReference<>(); |
| this.offsetManagement = new RecordOffsetManagement(); |
| this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY; |
| } |
| |
| private List<ConnectRecord> poll() throws InterruptedException { |
| try { |
| List<ConnectRecord> connectRecords = sourceTask.poll(); |
| if (CollectionUtils.isEmpty(connectRecords)) { |
| return null; |
| } |
| return connectRecords; |
| } catch (RetriableException e) { |
| log.error("Source task RetriableException exception, taskconfig {}", JSON.toJSONString(taskConfig), e); |
| return null; |
| } |
| } |
| |
| @Override |
| public void close() { |
| producer.shutdown(); |
| stopRequestedLatch.countDown(); |
| Utils.closeQuietly(transformChain, "transform chain"); |
| Utils.closeQuietly(retryWithToleranceOperator, "retry operator"); |
| Utils.closeQuietly(positionStorageWriter, "position storage writer"); |
| } |
| |
| protected void updateCommittableOffsets() { |
| RecordOffsetManagement.CommittableOffsets newOffsets = offsetManagement.committableOffsets(); |
| synchronized (this) { |
| this.committableOffsets = this.committableOffsets.updatedWith(newOffsets); |
| } |
| } |
| |
| protected Optional<RecordOffsetManagement.SubmittedPosition> prepareToSendRecord( |
| ConnectRecord record |
| ) { |
| maybeThrowProducerSendException(); |
| return Optional.of(this.offsetManagement.submitRecord(record.getPosition())); |
| } |
| |
| /** |
| * Send list of sourceDataEntries to MQ. |
| */ |
| private Boolean sendRecord() throws InterruptedException { |
| int processed = 0; |
| for (ConnectRecord preTransformRecord : toSendRecord) { |
| retryWithToleranceOperator.sourceRecord(preTransformRecord); |
| ConnectRecord record = transformChain.doTransforms(preTransformRecord); |
| String topic = maybeCreateAndGetTopic(record); |
| Message sourceMessage = convertTransformedRecord(topic, record); |
| if (sourceMessage == null || retryWithToleranceOperator.failed()) { |
| // commit record |
| recordFailed(preTransformRecord); |
| continue; |
| } |
| log.trace("{} Appending record to the topic {} , value {}", this, topic, record.getData()); |
| /**prepare to send record*/ |
| Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord); |
| try { |
| |
| SendCallback callback = new SendCallback() { |
| @Override |
| public void onSuccess(SendResult result) { |
| log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); |
| // metrics |
| incWriteRecordStat(); |
| // commit record for custom |
| recordSent(preTransformRecord, sourceMessage, result); |
| // ack record position |
| submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); |
| } |
| |
| @Override |
| public void onException(Throwable throwable) { |
| log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable); |
| // fail record metrics |
| inWriteRecordFail(); |
| // record send failed |
| recordSendFailed(false, sourceMessage, preTransformRecord, throwable); |
| } |
| }; |
| |
| if (StringUtils.isEmpty(sourceMessage.getKeys())) { |
| // Round robin |
| producer.send(sourceMessage, callback); |
| } else { |
| // Partition message ordering, |
| // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business |
| producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback); |
| } |
| |
| } catch (RetriableException e) { |
| log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ", |
| this, sourceMessage.getTopic(), e); |
| // Intercepted as successfully sent, used to continue sending next time |
| toSendRecord = toSendRecord.subList(processed, toSendRecord.size()); |
| // remove pre submit position, for retry |
| submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::remove); |
| return false; |
| } catch (MQClientException | RemotingException e) { |
| log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e); |
| inWriteRecordFail(); |
| recordSendFailed(true, sourceMessage, preTransformRecord, e); |
| } catch (InterruptedException e) { |
| log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e); |
| inWriteRecordFail(); |
| throw e; |
| } |
| processed++; |
| } |
| toSendRecord = null; |
| return true; |
| } |
| |
| private void prepareToPollTask() { |
| maybeThrowProducerSendException(); |
| } |
| |
| private void maybeThrowProducerSendException() { |
| if (producerSendException.get() != null) { |
| throw new ConnectException( |
| "Unrecoverable exception from producer send callback", |
| producerSendException.get() |
| ); |
| } |
| } |
| |
| private void recordSendFailed( |
| boolean synchronous, |
| Message sourceMessage, |
| ConnectRecord preTransformRecord, |
| Throwable e) { |
| if (synchronous) { |
| throw new ConnectException("Unrecoverable exception trying to send", e); |
| } |
| String topic = sourceMessage.getTopic(); |
| if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) { |
| // ignore all error |
| log.trace( |
| "Ignoring failed record send: {} failed to send record to {}: ", |
| WorkerSourceTask.this, |
| topic, |
| e |
| ); |
| retryWithToleranceOperator.executeFailed( |
| ErrorReporter.Stage.ROCKETMQ_PRODUCE, |
| WorkerSourceTask.class, |
| preTransformRecord, |
| e); |
| commitTaskRecord(preTransformRecord, null); |
| } else { |
| log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); |
| log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); |
| producerSendException.compareAndSet(null, e); |
| } |
| } |
| |
| /** |
| * failed send |
| * |
| * @param record |
| */ |
| private void recordFailed(ConnectRecord record) { |
| commitTaskRecord(record, null); |
| } |
| |
| /** |
| * send success record |
| * |
| * @param preTransformRecord |
| * @param sourceMessage |
| * @param result |
| */ |
| private void recordSent( |
| ConnectRecord preTransformRecord, |
| Message sourceMessage, |
| SendResult result) { |
| commitTaskRecord(preTransformRecord, result); |
| } |
| |
| private void commitTaskRecord(ConnectRecord preTransformRecord, SendResult result) { |
| ConnectKeyValue keyValue = null; |
| if (result != null) { |
| keyValue = new ConnectKeyValue(); |
| keyValue.put("send.status", result.getSendStatus().name()); |
| keyValue.put("msg.id", result.getMsgId()); |
| keyValue.put("topic", result.getMessageQueue().getTopic()); |
| keyValue.put("broker.name", result.getMessageQueue().getBrokerName()); |
| keyValue.put("queue.id", result.getMessageQueue().getQueueId()); |
| keyValue.put("queue.offset", result.getQueueOffset()); |
| keyValue.put("transaction.id", result.getTransactionId()); |
| keyValue.put("offset.msg.id", result.getOffsetMsgId()); |
| keyValue.put("region.id", result.getRegionId()); |
| } |
| sourceTask.commit(preTransformRecord, keyValue == null ? null : keyValue.getProperties()); |
| } |
| |
| /** |
| * Convert the source record into a producer record. |
| */ |
| protected Message convertTransformedRecord(final String topic, ConnectRecord record) { |
| if (record == null) { |
| return null; |
| } |
| Message sourceMessage = new Message(); |
| sourceMessage.setTopic(topic); |
| byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(topic, record.getKeySchema(), record.getKey()), |
| ErrorReporter.Stage.CONVERTER, keyConverter.getClass()); |
| |
| byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(topic, record.getSchema(), record.getData()), |
| ErrorReporter.Stage.CONVERTER, valueConverter.getClass()); |
| if (value.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) { |
| log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record)); |
| } |
| sourceMessage.setKeys(Base64Util.base64Encode(key)); |
| sourceMessage.setBody(value); |
| if (retryWithToleranceOperator.failed()) { |
| return null; |
| } |
| // put extend msg property |
| putExtendMsgProperty(record, sourceMessage, topic); |
| return sourceMessage; |
| } |
| |
| /** |
| * maybe create and get topic |
| * |
| * @param record |
| * @return |
| */ |
| private String maybeCreateAndGetTopic(ConnectRecord record) { |
| String topic = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME); |
| if (StringUtils.isBlank(topic)) { |
| RecordPosition recordPosition = record.getPosition(); |
| if (null == recordPosition) { |
| log.error("connect-topicname config is null and recordPosition is null , lack of topic config"); |
| } |
| RecordPartition partition = recordPosition.getPartition(); |
| if (null == partition) { |
| log.error("connect-topicname config is null and partition is null , lack of topic config"); |
| } |
| Map<String, ?> partitionMap = partition.getPartition(); |
| if (null == partitionMap) { |
| log.error("connect-topicname config is null and partitionMap is null , lack of topic config"); |
| } |
| Object o = partitionMap.get(TOPIC); |
| if (null == o) { |
| log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config"); |
| } |
| topic = (String) o; |
| } |
| if (StringUtils.isBlank(topic)) { |
| throw new ConnectException("source connect lack of topic config"); |
| } |
| if (ConnectUtil.isTopicExist(workerConfig, topic)) { |
| ConnectUtil.createTopic(workerConfig, new TopicConfig(topic)); |
| } |
| return topic; |
| } |
| |
| private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) { |
| KeyValue extensionKeyValues = sourceDataEntry.getExtensions(); |
| if (null == extensionKeyValues) { |
| log.info("extension key value is null."); |
| return; |
| } |
| Set<String> keySet = extensionKeyValues.keySet(); |
| if (CollectionUtils.isEmpty(keySet)) { |
| log.info("extension keySet null."); |
| return; |
| } |
| for (String key : keySet) { |
| if (WHITE_KEY_SET.contains(key)) { |
| MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key)); |
| } else { |
| MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key)); |
| } |
| } |
| |
| } |
| |
| /** |
| * initinalize and start |
| */ |
| @Override |
| protected void initializeAndStart() { |
| try { |
| producer.start(); |
| } catch (MQClientException e) { |
| log.error("{} Source task producer start failed!!", this); |
| throw new ConnectException(e); |
| } |
| sourceTask.init(sourceTaskContext); |
| sourceTask.start(taskConfig); |
| log.info("{} Source task finished initialization and start", this); |
| } |
| |
| protected void recordPollReturned(int numRecordsInBatch) { |
| connectStatsManager.incSourceRecordPollTotalNums(numRecordsInBatch); |
| connectStatsManager.incSourceRecordPollNums(id().toString() + "", numRecordsInBatch); |
| } |
| |
| /** |
| * execute poll and send record |
| */ |
| @Override |
| protected void execute() { |
| while (isRunning()) { |
| |
| updateCommittableOffsets(); |
| if (CollectionUtils.isEmpty(toSendRecord)) { |
| try { |
| prepareToPollTask(); |
| toSendRecord = poll(); |
| if (null != toSendRecord && toSendRecord.size() > 0) { |
| recordPollReturned(toSendRecord.size()); |
| } |
| if (toSendRecord == null) { |
| continue; |
| } |
| log.trace("{} About to send {} records to RocketMQ", this, toSendRecord.size()); |
| if (!sendRecord()) { |
| stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); |
| } |
| } catch (InterruptedException e) { |
| // Ignore and allow to exit. |
| } catch (Exception e) { |
| try { |
| finalOffsetCommit(true); |
| } catch (Exception offsetException) { |
| log.error("Failed to commit offsets for already-failing task", offsetException); |
| } |
| throw e; |
| } finally { |
| finalOffsetCommit(false); |
| // record source poll times |
| connectStatsManager.incSourceRecordPollTotalTimes(); |
| } |
| } |
| AtomicLong atomicLong = connectStatsService.singleSourceTaskTimesTotal(id().toString()); |
| if (null != atomicLong) { |
| atomicLong.addAndGet(toSendRecord == null ? 0 : toSendRecord.size()); |
| } |
| } |
| } |
| |
| protected void finalOffsetCommit(boolean b) { |
| |
| offsetManagement.awaitAllMessages( |
| workerConfig.getOffsetCommitTimeoutMsConfig(), |
| TimeUnit.MILLISECONDS |
| ); |
| updateCommittableOffsets(); |
| commitOffsets(); |
| } |
| |
| public boolean commitOffsets() { |
| long commitTimeoutMs = workerConfig.getOffsetCommitTimeoutMsConfig(); |
| log.debug("{} Committing offsets", this); |
| |
| long started = System.currentTimeMillis(); |
| long timeout = started + commitTimeoutMs; |
| |
| RecordOffsetManagement.CommittableOffsets offsetsToCommit; |
| synchronized (this) { |
| offsetsToCommit = this.committableOffsets; |
| this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY; |
| } |
| |
| if (committableOffsets.isEmpty()) { |
| log.debug("{} Either no records were produced by the task since the last offset commit, " |
| + "or every record has been filtered out by a transformation " |
| + "or dropped due to transformation or conversion errors.", |
| this |
| ); |
| // We continue with the offset commit process here instead of simply returning immediately |
| // in order to invoke SourceTask::commit and record metrics for a successful offset commit |
| } else { |
| log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages()); |
| if (committableOffsets.hasPending()) { |
| log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. " |
| + "The source partition with the most pending messages is {}, with {} pending messages", |
| this, |
| committableOffsets.numUncommittableMessages(), |
| committableOffsets.numDeques(), |
| committableOffsets.largestDequePartition(), |
| committableOffsets.largestDequeSize() |
| ); |
| } else { |
| log.debug("{} There are currently no pending messages for this offset commit; " |
| + "all messages dispatched to the task's producer since the last commit have been acknowledged", |
| this |
| ); |
| } |
| } |
| |
| // write offset |
| offsetsToCommit.offsets().forEach(positionStorageWriter::writeOffset); |
| |
| // begin flush |
| if (!positionStorageWriter.beginFlush()) { |
| // There was nothing in the offsets to process, but we still mark a successful offset commit. |
| long durationMillis = System.currentTimeMillis() - started; |
| log.debug("{} Finished offset commitOffsets successfully in {} ms", |
| this, durationMillis); |
| commitSourceTask(); |
| return true; |
| } |
| |
| Future<Void> flushFuture = positionStorageWriter.doFlush((error, key, result) -> { |
| if (error != null) { |
| log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error); |
| } else { |
| log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this); |
| } |
| }); |
| try { |
| flushFuture.get(Math.max(timeout - System.currentTimeMillis(), 0), TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| log.warn("{} Flush of offsets interrupted, cancelling", this); |
| positionStorageWriter.cancelFlush(); |
| return false; |
| } catch (ExecutionException e) { |
| log.error("{} Flush of offsets threw an unexpected exception: ", this, e); |
| positionStorageWriter.cancelFlush(); |
| return false; |
| } catch (TimeoutException e) { |
| log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this); |
| positionStorageWriter.cancelFlush(); |
| return false; |
| } |
| long durationMillis = System.currentTimeMillis() - started; |
| log.debug("{} Finished commitOffsets successfully in {} ms", |
| this, durationMillis); |
| commitSourceTask(); |
| return true; |
| } |
| |
| protected void commitSourceTask() { |
| try { |
| this.sourceTask.commit(); |
| } catch (Throwable t) { |
| log.error("{} Exception thrown while calling task.commit()", this, t); |
| } |
| } |
| |
| private void inWriteRecordFail() { |
| connectStatsManager.incSourceRecordWriteTotalFailNums(); |
| connectStatsManager.incSourceRecordWriteFailNums(id().toString()); |
| } |
| |
| private void incWriteRecordStat() { |
| connectStatsManager.incSourceRecordWriteTotalNums(); |
| connectStatsManager.incSourceRecordWriteNums(id().toString()); |
| } |
| |
| } |