blob: f7a17f3d7fc8678a968bac76cd773f13bc63315c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
public abstract class PipeTransferBatchReqBuilder implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferBatchReqBuilder.class);
protected final List<Event> events = new ArrayList<>();
protected final List<Long> requestCommitIds = new ArrayList<>();
protected final List<ByteBuffer> binaryBuffers = new ArrayList<>();
protected final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
protected final List<ByteBuffer> tabletBuffers = new ArrayList<>();
// limit in delayed time
protected final int maxDelayInMs;
protected long firstEventProcessingTime = Long.MIN_VALUE;
// limit in buffer size
protected final PipeMemoryBlock allocatedMemoryBlock;
protected long totalBufferSize = 0;
protected PipeTransferBatchReqBuilder(final PipeParameters parameters) {
maxDelayInMs =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, SINK_IOTDB_BATCH_DELAY_KEY),
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
* 1000;
final long requestMaxBatchSizeInBytes =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
allocatedMemoryBlock =
PipeResourceManager.memory()
.tryAllocate(requestMaxBatchSizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
.setShrinkCallback(
(oldMemory, newMemory) ->
LOGGER.info(
"The batch size limit has shrunk from {} to {}.", oldMemory, newMemory))
.setExpandMethod(
oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes))
.setExpandCallback(
(oldMemory, newMemory) ->
LOGGER.info(
"The batch size limit has expanded from {} to {}.", oldMemory, newMemory));
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
LOGGER.info(
"PipeTransferBatchReqBuilder: the max batch size is adjusted from {} to {} due to the "
+ "memory restriction",
requestMaxBatchSizeInBytes,
getMaxBatchSizeInBytes());
}
}
/**
* Try offer {@link Event} into cache if the given {@link Event} is not duplicated.
*
* @param event the given {@link Event}
* @return {@link true} if the batch can be transferred
*/
public synchronized boolean onEvent(final TabletInsertionEvent event)
throws IOException, WALPipeException {
if (!(event instanceof EnrichedEvent)) {
return false;
}
final long requestCommitId = ((EnrichedEvent) event).getCommitId();
// The deduplication logic here is to avoid the accumulation of the same event in a batch when
// retrying.
if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
// We increase the reference count for this event to determine if the event may be released.
if (((EnrichedEvent) event)
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
events.add(event);
requestCommitIds.add(requestCommitId);
final int bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
if (firstEventProcessingTime == Long.MIN_VALUE) {
firstEventProcessingTime = System.currentTimeMillis();
}
} else {
((EnrichedEvent) event)
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
}
}
return totalBufferSize >= getMaxBatchSizeInBytes()
|| System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs;
}
public synchronized void onSuccess() {
binaryBuffers.clear();
insertNodeBuffers.clear();
tabletBuffers.clear();
events.clear();
requestCommitIds.clear();
firstEventProcessingTime = Long.MIN_VALUE;
totalBufferSize = 0;
}
public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
return PipeTransferTabletBatchReq.toTPipeTransferReq(
binaryBuffers, insertNodeBuffers, tabletBuffers);
}
protected long getMaxBatchSizeInBytes() {
return allocatedMemoryBlock.getMemoryUsageInBytes();
}
public boolean isEmpty() {
return binaryBuffers.isEmpty() && insertNodeBuffers.isEmpty() && tabletBuffers.isEmpty();
}
public List<Event> deepCopyEvents() {
return new ArrayList<>(events);
}
protected int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException, WALPipeException {
final ByteBuffer buffer;
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) event;
// Read the bytebuffer from the wal file and transfer it directly without serializing or
// deserializing if possible
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
if (Objects.isNull(insertNode)) {
buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
binaryBuffers.add(buffer);
} else {
buffer = insertNode.serializeToByteBuffer();
insertNodeBuffers.add(buffer);
}
} else {
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) event;
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), outputStream);
buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
tabletBuffers.add(buffer);
}
return buffer.limit();
}
@Override
public synchronized void close() {
clearEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName());
allocatedMemoryBlock.close();
}
public void decreaseEventsReferenceCount(final String holderMessage, final boolean shouldReport) {
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event).decreaseReferenceCount(holderMessage, shouldReport);
}
}
}
public void clearEventsReferenceCount(final String holderMessage) {
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event).clearReferenceCount(holderMessage);
}
}
}
}