blob: dca91073f2590ed2031fccd768c06c09c2a1121b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class SchemaPartitionTable {
private final Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap;
public SchemaPartitionTable() {
this.schemaPartitionMap = new ConcurrentHashMap<>();
}
public SchemaPartitionTable(Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap) {
this.schemaPartitionMap = schemaPartitionMap;
}
public Map<TSeriesPartitionSlot, TConsensusGroupId> getSchemaPartitionMap() {
return schemaPartitionMap;
}
/**
* Thread-safely get SchemaPartition within the specific StorageGroup
*
* @param partitionSlots SeriesPartitionSlots
* @param schemaPartitionTable Store the matched SchemaPartitions
* @return True if all the SeriesPartitionSlots are matched, false otherwise
*/
public boolean getSchemaPartition(
List<TSeriesPartitionSlot> partitionSlots, SchemaPartitionTable schemaPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
if (partitionSlots.isEmpty()) {
// Return all SchemaPartitions in one StorageGroup when the queried PartitionSlots are empty
schemaPartitionTable.getSchemaPartitionMap().putAll(schemaPartitionMap);
} else {
// Return the SchemaPartition for each SeriesPartitionSlot
partitionSlots.forEach(
seriesPartitionSlot -> {
if (schemaPartitionMap.containsKey(seriesPartitionSlot)) {
schemaPartitionTable
.getSchemaPartitionMap()
.put(seriesPartitionSlot, schemaPartitionMap.get(seriesPartitionSlot));
} else {
result.set(false);
}
});
}
return result.get();
}
/**
* Create SchemaPartition within the specific StorageGroup
*
* @param assignedSchemaPartition assigned result
* @return Map<TConsensusGroupId, Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count(always
* 0)>>
*/
public Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> createSchemaPartition(
SchemaPartitionTable assignedSchemaPartition) {
Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> groupDeltaMap =
new ConcurrentHashMap<>();
assignedSchemaPartition
.getSchemaPartitionMap()
.forEach(
((seriesPartitionSlot, consensusGroupId) -> {
schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
groupDeltaMap
.computeIfAbsent(consensusGroupId, empty -> new ConcurrentHashMap<>())
.put(seriesPartitionSlot, new AtomicLong(0));
}));
return groupDeltaMap;
}
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots within the specific
* StorageGroup.
*
* @param partitionSlots List<TSeriesPartitionSlot>
* @return Unassigned PartitionSlots
*/
public List<TSeriesPartitionSlot> filterUnassignedSchemaPartitionSlots(
List<TSeriesPartitionSlot> partitionSlots) {
List<TSeriesPartitionSlot> result = new Vector<>();
partitionSlots.forEach(
seriesPartitionSlot -> {
if (!schemaPartitionMap.containsKey(seriesPartitionSlot)) {
result.add(seriesPartitionSlot);
}
});
return result;
}
/**
* Query a timePartition's corresponding schemaRegionIds
*
* @param seriesSlotId SeriesPartitionSlot
* @return the timePartition's corresponding dataRegionIds, if seriesSlotId==-1, then return all
* seriesPartitionTable's schemaRegionIds;
*/
public List<TConsensusGroupId> getRegionId(TSeriesPartitionSlot seriesSlotId) {
if (seriesSlotId.getSlotId() == -1) {
return new ArrayList<>(schemaPartitionMap.values());
}
if (!schemaPartitionMap.containsKey(seriesSlotId)) {
return new ArrayList<>();
}
return Collections.singletonList(schemaPartitionMap.get(seriesSlotId));
}
public List<TSeriesPartitionSlot> getSeriesSlotList() {
return schemaPartitionMap.keySet().stream()
.sorted(Comparator.comparing(TSeriesPartitionSlot::getSlotId))
.collect(Collectors.toList());
}
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(schemaPartitionMap.size(), outputStream);
for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionEntry :
schemaPartitionMap.entrySet()) {
schemaPartitionEntry.getKey().write(protocol);
schemaPartitionEntry.getValue().write(protocol);
}
}
/** Only for ConsensusRequest */
public void deserialize(ByteBuffer buffer) {
int length = buffer.getInt();
for (int i = 0; i < length; i++) {
schemaPartitionMap.put(
ThriftCommonsSerDeUtils.deserializeTSeriesPartitionSlot(buffer),
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
}
}
/** Only for Snapshot */
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
int length = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < length; i++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot();
seriesPartitionSlot.read(protocol);
TConsensusGroupId consensusGroupId = new TConsensusGroupId();
consensusGroupId.read(protocol);
schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaPartitionTable that = (SchemaPartitionTable) o;
return schemaPartitionMap.equals(that.schemaPartitionMap);
}
@Override
public int hashCode() {
return Objects.hash(schemaPartitionMap);
}
}