blob: bddcfc00ebee4bd7293876efa34e24812c0bfe44 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.pattern.matcher.PipeDataRegionMatcher;
import java.io.Closeable;
public class PipeDataRegionAssigner implements Closeable {
/**
* The {@link PipeDataRegionMatcher} is used to match the event with the extractor based on the
* pattern.
*/
private final PipeDataRegionMatcher matcher;
/** The {@link DisruptorQueue} is used to assign the event to the extractor. */
private final DisruptorQueue disruptor;
private final String dataRegionId;
public String getDataRegionId() {
return dataRegionId;
}
public PipeDataRegionAssigner(String dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
this.disruptor = new DisruptorQueue(this::assignToExtractor);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
}
public void publishToAssign(PipeRealtimeEvent event) {
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
disruptor.publish(event);
if (event.getEvent() instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event.getEvent()).onPublished();
}
}
public void assignToExtractor(PipeRealtimeEvent event, long sequence, boolean endOfBatch) {
matcher
.match(event)
.forEach(
extractor -> {
if (event.getEvent().isGeneratedByPipe() && !extractor.isForwardingPipeRequests()) {
return;
}
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeName(),
extractor.getPipeTaskMeta(),
extractor.getPipePattern(),
extractor.getRealtimeDataExtractionStartTime(),
extractor.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
if (innerEvent instanceof PipeTsFileInsertionEvent) {
((PipeTsFileInsertionEvent) innerEvent)
.disableMod4NonTransferPipes(extractor.isShouldTransferModFile());
}
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
extractor.extract(copiedEvent);
if (innerEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) innerEvent).bindPipeName(extractor.getPipeName());
((PipeHeartbeatEvent) innerEvent).onAssigned();
}
});
event.gcSchemaInfo();
event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), false);
}
public void startAssignTo(PipeRealtimeDataRegionExtractor extractor) {
matcher.register(extractor);
}
public void stopAssignTo(PipeRealtimeDataRegionExtractor extractor) {
matcher.deregister(extractor);
}
public boolean notMoreExtractorNeededToBeAssigned() {
return matcher.getRegisterCount() == 0;
}
/**
* Clear the matcher and disruptor. The method {@link PipeDataRegionAssigner#publishToAssign}
* should not be used after calling this method.
*/
@Override
public void close() {
PipeAssignerMetrics.getInstance().deregister(dataRegionId);
matcher.clear();
disruptor.clear();
}
public int getTabletInsertionEventCount() {
return disruptor.getTabletInsertionEventCount();
}
public int getTsFileInsertionEventCount() {
return disruptor.getTsFileInsertionEventCount();
}
public int getPipeHeartbeatEventCount() {
return disruptor.getPipeHeartbeatEventCount();
}
}