blob: cec7a5f29e15f850db5e73b0209b748a92c7bfb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.extractor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.mockito.Mockito.mock;
public class PipeRealtimeExtractTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
private final String dataRegion1 = "1";
private final String dataRegion2 = "2";
private final String pattern1 = "root.sg.d";
private final String pattern2 = "root.sg.d.a";
private final String[] device = new String[] {"root", "sg", "d"};
private final AtomicBoolean alive = new AtomicBoolean();
private File tmpDir;
private File tsFileDir;
private ExecutorService writeService;
private ExecutorService listenerService;
@Before
public void setUp() throws IOException {
writeService = Executors.newFixedThreadPool(2);
listenerService = Executors.newFixedThreadPool(4);
tmpDir = new File(Files.createTempDirectory("pipeRealtimeExtractor").toString());
tsFileDir =
new File(
tmpDir.getPath()
+ File.separator
+ IoTDBConstant.SEQUENCE_FOLDER_NAME
+ File.separator
+ "root.sg");
}
@After
public void tearDown() {
writeService.shutdownNow();
listenerService.shutdownNow();
FileUtils.deleteFileOrDirectory(tmpDir);
}
@Test
public void testRealtimeExtractProcess() {
// set up realtime extractor
try (PipeRealtimeDataRegionLogExtractor extractor0 = new PipeRealtimeDataRegionLogExtractor();
PipeRealtimeDataRegionHybridExtractor extractor1 =
new PipeRealtimeDataRegionHybridExtractor();
PipeRealtimeDataRegionTsFileExtractor extractor2 =
new PipeRealtimeDataRegionTsFileExtractor();
PipeRealtimeDataRegionHybridExtractor extractor3 =
new PipeRealtimeDataRegionHybridExtractor()) {
PipeParameters parameters0 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1);
}
});
PipeParameters parameters1 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2);
}
});
PipeParameters parameters2 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern1);
}
});
PipeParameters parameters3 =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, pattern2);
}
});
PipeTaskRuntimeConfiguration configuration0 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion1), null));
PipeTaskRuntimeConfiguration configuration1 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion1), null));
PipeTaskRuntimeConfiguration configuration2 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion2), null));
PipeTaskRuntimeConfiguration configuration3 =
new PipeTaskRuntimeConfiguration(
new PipeTaskExtractorRuntimeEnvironment("1", 1, Integer.parseInt(dataRegion2), null));
// Some parameters of extractor are validated and initialized during the validation process.
extractor0.validate(new PipeParameterValidator(parameters0));
extractor0.customize(parameters0, configuration0);
extractor1.validate(new PipeParameterValidator(parameters1));
extractor1.customize(parameters1, configuration1);
extractor2.validate(new PipeParameterValidator(parameters2));
extractor2.customize(parameters2, configuration2);
extractor3.validate(new PipeParameterValidator(parameters3));
extractor3.customize(parameters3, configuration3);
PipeRealtimeDataRegionExtractor[] extractors =
new PipeRealtimeDataRegionExtractor[] {extractor0, extractor1, extractor2, extractor3};
// start extractor 0, 1
extractors[0].start();
extractors[1].start();
// test result of extractor 0, 1
int writeNum = 10;
List<Future<?>> writeFutures =
Arrays.asList(
write2DataRegion(writeNum, dataRegion1, 0),
write2DataRegion(writeNum, dataRegion2, 0));
alive.set(true);
List<Future<?>> listenFutures =
Arrays.asList(
listen(
extractors[0],
event -> event instanceof TabletInsertionEvent ? 1 : 2,
writeNum << 1),
listen(extractors[1], event -> 1, writeNum));
try {
listenFutures.get(0).get(10, TimeUnit.MINUTES);
listenFutures.get(1).get(10, TimeUnit.MINUTES);
} catch (TimeoutException e) {
LOGGER.warn("Time out when listening extractor", e);
alive.set(false);
Assert.fail();
}
writeFutures.forEach(
future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
// start extractor 2, 3
extractors[2].start();
extractors[3].start();
// test result of extractor 0 - 3
writeFutures =
Arrays.asList(
write2DataRegion(writeNum, dataRegion1, writeNum),
write2DataRegion(writeNum, dataRegion2, writeNum));
alive.set(true);
listenFutures =
Arrays.asList(
listen(
extractors[0],
event -> event instanceof TabletInsertionEvent ? 1 : 2,
writeNum << 1),
listen(extractors[1], event -> 1, writeNum),
listen(
extractors[2],
event -> event instanceof TabletInsertionEvent ? 1 : 2,
writeNum << 1),
listen(extractors[3], event -> 1, writeNum));
try {
listenFutures.get(0).get(10, TimeUnit.MINUTES);
listenFutures.get(1).get(10, TimeUnit.MINUTES);
listenFutures.get(2).get(10, TimeUnit.MINUTES);
listenFutures.get(3).get(10, TimeUnit.MINUTES);
} catch (TimeoutException e) {
LOGGER.warn("Time out when listening extractor", e);
alive.set(false);
Assert.fail();
}
writeFutures.forEach(
future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
File dataRegionDir =
new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
boolean ignored = dataRegionDir.mkdirs();
return writeService.submit(
() -> {
for (int i = startNum; i < startNum + writeNum; ++i) {
File tsFile = new File(dataRegionDir, String.format("%s-%s-0-0.tsfile", i, i));
try {
boolean ignored1 = tsFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
TsFileResource resource = new TsFileResource(tsFile);
resource.updateStartTime(
new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR, device)), 0);
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
mock(WALEntryHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
false,
new String[] {"a"},
null,
0,
null,
false),
resource);
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
mock(WALEntryHandler.class),
new InsertRowNode(
new PlanNodeId(String.valueOf(i)),
new PartialPath(device),
false,
new String[] {"b"},
null,
0,
null,
false),
resource);
PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(dataRegionId, resource, false, false);
}
});
}
private Future<?> listen(
PipeRealtimeDataRegionExtractor extractor, Function<Event, Integer> weight, int expectNum) {
return listenerService.submit(
() -> {
int eventNum = 0;
try {
while (alive.get() && eventNum < expectNum) {
Event event;
try {
event = extractor.supply();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (event != null) {
eventNum += weight.apply(event);
}
}
} finally {
Assert.assertEquals(expectNum, eventNum);
}
});
}
}