blob: 01852a4883652705835e56c9db8c337d224ccfeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.LOWER_BOUND;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NUM_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.UPPER_BOUND;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.DEFAULT_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.constant.SourceConstants.SQL_WHERE;
import static org.apache.iotdb.tsfile.common.constant.QueryConstant.RESERVED_TIME;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class IoTDBSourceSplitEnumerator implements SourceSplitEnumerator<IoTDBSourceSplit, IoTDBSourceState> {
private final Context<IoTDBSourceSplit> context;
private Set<IoTDBSourceSplit> pendingSplit;
private Set<IoTDBSourceSplit> assignedSplit;
private Map<String, Object> conf;
/**
* A SQL statement can contain at most one where
* We split the SQL using the where keyword
* Therefore, it can be split into two SQL at most
*/
private static final int SQL_WHERE_SPLIT_LENGTH = 2;
public IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit> context, Map<String, Object> conf) {
this.context = context;
this.conf = conf;
}
public IoTDBSourceSplitEnumerator(SourceSplitEnumerator.Context<IoTDBSourceSplit> context, IoTDBSourceState sourceState, Map<String, Object> conf) {
this(context, conf);
this.assignedSplit = sourceState.getAssignedSplit();
}
@Override
public void open() {
this.assignedSplit = new HashSet<>();
this.pendingSplit = new HashSet<>();
}
@Override
public void run() {
pendingSplit = getIotDBSplit();
assignSplit(context.registeredReaders());
}
/**
* split the time range into numPartitions parts
* if numPartitions is 1, use the whole time range
* if numPartitions < (end - start), use (start-end) partitions
* <p>
* eg: start = 1, end = 10, numPartitions = 2
* sql = "select * from test where age > 0 and age < 10"
* <p>
* split result
* <p>
* split 1: select * from test where (time >= 1 and time < 6) and ( age > 0 and age < 10 )
* <p>
* split 2: select * from test where (time >= 6 and time < 11) and ( age > 0 and age < 10 )
*/
private Set<IoTDBSourceSplit> getIotDBSplit() {
String sql = conf.get(SQL).toString();
Set<IoTDBSourceSplit> iotDBSourceSplits = new HashSet<>();
// no need numPartitions, use one partition
if (!conf.containsKey(NUM_PARTITIONS)) {
iotDBSourceSplits.add(new IoTDBSourceSplit(DEFAULT_PARTITIONS, sql));
return iotDBSourceSplits;
}
long start = Long.parseLong(conf.get(LOWER_BOUND).toString());
long end = Long.parseLong(conf.get(UPPER_BOUND).toString());
int numPartitions = Integer.parseInt(conf.get(NUM_PARTITIONS).toString());
String[] sqls = sql.split(SQL_WHERE);
if (sqls.length > SQL_WHERE_SPLIT_LENGTH) {
throw new IllegalArgumentException("sql should not contain more than one where");
}
int size = (int) (end - start) / numPartitions + 1;
int remainder = (int) ((end + 1 - start) % numPartitions);
if (end - start < numPartitions) {
numPartitions = (int) (end - start);
}
long currentStart = start;
int i = 0;
while (i < numPartitions) {
String query = " where (" + RESERVED_TIME + " >= " + currentStart + " and " + RESERVED_TIME + " < " + (currentStart + size) + ") ";
i++;
currentStart += size;
if (i + 1 <= numPartitions) {
currentStart = currentStart - remainder;
}
query = sqls[0] + query;
if (sqls.length > 1) {
query = query + " and ( " + sqls[1] + " ) ";
}
iotDBSourceSplits.add(new IoTDBSourceSplit(String.valueOf(i + System.nanoTime()), query));
}
return iotDBSourceSplits;
}
@Override
public void addSplitsBack(List<IoTDBSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
pendingSplit.addAll(splits);
assignSplit(Collections.singletonList(subtaskId));
}
}
@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
}
@Override
public void registerReader(int subtaskId) {
if (!pendingSplit.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
}
private void assignSplit(Collection<Integer> taskIDList) {
Map<Integer, List<IoTDBSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
for (int taskID : taskIDList) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}
pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size()))
.add(s));
readySplit.forEach(context::assignSplit);
assignedSplit.addAll(pendingSplit);
pendingSplit.clear();
}
@Override
public IoTDBSourceState snapshotState(long checkpointId) throws Exception {
return new IoTDBSourceState(assignedSplit);
}
private static int getSplitOwner(String tp, int numReaders) {
return tp.hashCode() % numReaders;
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
//nothing to do
}
@Override
public void close() {
//nothing to do
}
@Override
public void handleSplitRequest(int subtaskId) {
//nothing to do
}
}