blob: 764122c6a269f21ac7491fdd38695363cddb930c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class SeriesPartitionTable {
private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap;
public SeriesPartitionTable() {
this.seriesPartitionMap = new TreeMap<>();
}
public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap) {
this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
}
public Map<TTimePartitionSlot, List<TConsensusGroupId>> getSeriesPartitionMap() {
return seriesPartitionMap;
}
public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) {
seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new ArrayList<>()).add(groupId);
}
/**
* Thread-safely get DataPartition within the specific Database.
*
* @param partitionSlotList TimePartitionSlotList
* @param seriesPartitionTable Store the matched SeriesPartitions
* @return True if all the SeriesPartitionSlots are matched, false otherwise
*/
public boolean getDataPartition(
TTimeSlotList partitionSlotList, SeriesPartitionTable seriesPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
List<TTimePartitionSlot> partitionSlots = partitionSlotList.getTimePartitionSlots();
if (partitionSlots.isEmpty()) {
// Return all DataPartitions in one SeriesPartitionSlot
// when the queried TimePartitionSlots are empty
seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
} else {
boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
isNeedRightAll = partitionSlotList.isNeedRightAll();
if (isNeedLeftAll || isNeedRightAll) {
// we need to calculate the leftMargin which contains all the time partition on the unclosed
// left side: (-oo, leftMargin)
// and the rightMargin which contains all the time partition on the unclosed right side:
// (rightMargin, +oo)
// all the remaining closed time range which locates in [leftMargin, rightMargin] will be
// calculated outside if block
long leftMargin = isNeedLeftAll ? partitionSlots.get(0).getStartTime() : Long.MIN_VALUE,
rightMargin =
isNeedRightAll
? partitionSlots.get(partitionSlots.size() - 1).getStartTime()
: Long.MAX_VALUE;
seriesPartitionTable
.getSeriesPartitionMap()
.putAll(
seriesPartitionMap.entrySet().stream()
.filter(
entry -> {
long startTime = entry.getKey().getStartTime();
return startTime < leftMargin || startTime > rightMargin;
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
// Return the DataPartition for each match TimePartitionSlot
partitionSlots.forEach(
timePartitionSlot -> {
if (seriesPartitionMap.containsKey(timePartitionSlot)) {
seriesPartitionTable
.getSeriesPartitionMap()
.put(timePartitionSlot, seriesPartitionMap.get(timePartitionSlot));
} else {
result.set(false);
}
});
}
return result.get();
}
/**
* Check and return the specified DataPartition's successor.
*
* @param timePartitionSlot Corresponding TimePartitionSlot
* @return The specified DataPartition's successor if exists, null otherwise
*/
public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot timePartitionSlot) {
TTimePartitionSlot successorSlot = seriesPartitionMap.higherKey(timePartitionSlot);
return successorSlot == null ? null : seriesPartitionMap.get(successorSlot).get(0);
}
/**
* Check and return the specified DataPartition's predecessor.
*
* @param timePartitionSlot Corresponding TimePartitionSlot
* @return The specified DataPartition's predecessor if exists, null otherwise
*/
public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot timePartitionSlot) {
TTimePartitionSlot predecessorSlot = seriesPartitionMap.lowerKey(timePartitionSlot);
return predecessorSlot == null ? null : seriesPartitionMap.get(predecessorSlot).get(0);
}
/**
* Query a timePartition's corresponding dataRegionIds.
*
* @param startTimeSlotId start Time partition's timeSlotId
* @param endTimeSlotId end Time partition's timeSlotId
* @return the timePartition's corresponding dataRegionIds. return the dataRegions which
* timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
*/
List<TConsensusGroupId> getRegionId(
TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
return seriesPartitionMap.entrySet().stream()
.filter(
entry ->
entry.getKey().getStartTime() >= startTimeSlotId.getStartTime()
&& entry.getKey().getStartTime() <= endTimeSlotId.getStartTime())
.flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toList());
}
List<TTimePartitionSlot> getTimeSlotList(
TConsensusGroupId regionId, long startTime, long endTime) {
if (regionId.getId() == -1) {
return seriesPartitionMap.keySet().stream()
.filter(e -> e.getStartTime() >= startTime && e.getStartTime() < endTime)
.collect(Collectors.toList());
} else {
return seriesPartitionMap.keySet().stream()
.filter(e -> e.getStartTime() >= startTime && e.getStartTime() < endTime)
.filter(e -> seriesPartitionMap.get(e).contains(regionId))
.collect(Collectors.toList());
}
}
/**
* Create DataPartition within the specific SeriesPartitionSlot.
*
* @param assignedSeriesPartitionTable Assigned result
* @param seriesPartitionSlot Corresponding TSeriesPartitionSlot
* @param groupDeltaMap Map<TConsensusGroupId, Map<TSeriesPartitionSlot, Delta TTimePartitionSlot
* Count>>
*/
public void createDataPartition(
SeriesPartitionTable assignedSeriesPartitionTable,
TSeriesPartitionSlot seriesPartitionSlot,
Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> groupDeltaMap) {
assignedSeriesPartitionTable
.getSeriesPartitionMap()
.forEach(
((timePartitionSlot, consensusGroupIds) -> {
seriesPartitionMap.put(timePartitionSlot, new Vector<>(consensusGroupIds));
consensusGroupIds.forEach(
consensusGroupId ->
groupDeltaMap
.computeIfAbsent(consensusGroupId, empty -> new ConcurrentHashMap<>())
.computeIfAbsent(seriesPartitionSlot, empty -> new AtomicLong(0))
.getAndIncrement());
}));
}
/**
* Only Leader use this interface. And this interface is synchronized. Thread-safely filter no
* assigned DataPartitionSlots within the specific SeriesPartitionSlot.
*
* @param partitionSlots TimePartitionSlots
* @return Unassigned PartitionSlots
*/
public synchronized List<TTimePartitionSlot> filterUnassignedDataPartitionSlots(
List<TTimePartitionSlot> partitionSlots) {
List<TTimePartitionSlot> result = new Vector<>();
partitionSlots.forEach(
timePartitionSlot -> {
if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
result.add(timePartitionSlot);
}
});
return result;
}
/**
* Get the last DataPartition's ConsensusGroupId.
*
* @return The last DataPartition's ConsensusGroupId, null if there are no DataPartitions yet
*/
public TConsensusGroupId getLastConsensusGroupId() {
Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
seriesPartitionMap.lastEntry();
if (lastEntry == null) {
return null;
}
return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
}
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionEntry :
seriesPartitionMap.entrySet()) {
seriesPartitionEntry.getKey().write(protocol);
ReadWriteIOUtils.write(seriesPartitionEntry.getValue().size(), outputStream);
for (TConsensusGroupId consensusGroupId : seriesPartitionEntry.getValue()) {
consensusGroupId.write(protocol);
}
}
}
/** Only for ConsensusRequest. */
public void deserialize(ByteBuffer buffer) {
int timePartitionSlotNum = buffer.getInt();
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot =
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
int consensusGroupIdNum = buffer.getInt();
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
}
seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
/** Only for Snapshot. */
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
timePartitionSlot.read(protocol);
int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
TConsensusGroupId consensusGroupId = new TConsensusGroupId();
consensusGroupId.read(protocol);
consensusGroupIds.add(consensusGroupId);
}
seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SeriesPartitionTable that = (SeriesPartitionTable) o;
return seriesPartitionMap.equals(that.seriesPartitionMap);
}
@Override
public int hashCode() {
return Objects.hash(seriesPartitionMap);
}
}