blob: 33d8cd6d7d38bc5d01421a0721727eca9590d846 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.common.util;
import java.util.Objects;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
/**
* This represents the actual bit layout of {@link BlockId}s.
*
* <p>BlockId is positive long (63 bits) composed of sequenceNo, partitionId and taskAttemptId in
* that order from highest to lowest bits. The number of bits is defined by a {@link BlockIdLayout}.
* Values of partitionId, taskAttemptId and AtomicInteger are always positive.
*/
public class BlockIdLayout {
public static final BlockIdLayout DEFAULT = BlockIdLayout.from(18, 24, 21);
public final int sequenceNoBits;
public final int partitionIdBits;
public final int taskAttemptIdBits;
public final int sequenceNoOffset;
public final int partitionIdOffset;
public final int taskAttemptIdOffset;
public final long sequenceNoMask;
public final long partitionIdMask;
public final long taskAttemptIdMask;
public final int maxSequenceNo;
public final int maxPartitionId;
public final int maxTaskAttemptId;
public final int maxNumPartitions;
private BlockIdLayout(int sequenceNoBits, int partitionIdBits, int taskAttemptIdBits) {
// individual lengths must be lager than 0
if (sequenceNoBits <= 0 || partitionIdBits <= 0 || taskAttemptIdBits <= 0) {
throw new IllegalArgumentException(
"Don't support given lengths, individual lengths must be larger than 0: given "
+ "sequenceNoBits="
+ sequenceNoBits
+ ", partitionIdBits="
+ partitionIdBits
+ ", taskAttemptIdBits="
+ taskAttemptIdBits);
}
// individual lengths must be less than 32
if (sequenceNoBits >= 32 || partitionIdBits >= 32 || taskAttemptIdBits >= 32) {
throw new IllegalArgumentException(
"Don't support given lengths, individual lengths must be less that 32: given "
+ "sequenceNoBits="
+ sequenceNoBits
+ ", partitionIdBits="
+ partitionIdBits
+ ", taskAttemptIdBits="
+ taskAttemptIdBits);
}
// sum of individual lengths must be 63, otherwise we waste bits and risk overflow
if (sequenceNoBits + partitionIdBits + taskAttemptIdBits != 63) {
throw new IllegalArgumentException(
"Don't support given lengths, sum must be exactly 63: "
+ sequenceNoBits
+ " + "
+ partitionIdBits
+ " + "
+ taskAttemptIdBits
+ " = "
+ (sequenceNoBits + partitionIdBits + taskAttemptIdBits));
}
this.sequenceNoBits = sequenceNoBits;
this.partitionIdBits = partitionIdBits;
this.taskAttemptIdBits = taskAttemptIdBits;
// fields are right-aligned
this.sequenceNoOffset = partitionIdBits + taskAttemptIdBits;
this.partitionIdOffset = taskAttemptIdBits;
this.taskAttemptIdOffset = 0;
// compute max ids
this.maxSequenceNo = (1 << sequenceNoBits) - 1;
this.maxPartitionId = (1 << partitionIdBits) - 1;
this.maxTaskAttemptId = (1 << taskAttemptIdBits) - 1;
// compute max nums
this.maxNumPartitions = this.maxPartitionId + 1;
// compute masks to simplify bit logic in BlockId methods
this.sequenceNoMask = (long) maxSequenceNo << sequenceNoOffset;
this.partitionIdMask = (long) maxPartitionId << partitionIdOffset;
this.taskAttemptIdMask = (long) maxTaskAttemptId << taskAttemptIdOffset;
}
@Override
public String toString() {
return "blockIdLayout["
+ "seq: "
+ sequenceNoBits
+ " bits, part: "
+ partitionIdBits
+ " bits, task: "
+ taskAttemptIdBits
+ " bits]";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BlockIdLayout that = (BlockIdLayout) o;
return sequenceNoBits == that.sequenceNoBits
&& partitionIdBits == that.partitionIdBits
&& taskAttemptIdBits == that.taskAttemptIdBits;
}
@Override
public int hashCode() {
return Objects.hash(sequenceNoBits, partitionIdBits, taskAttemptIdBits);
}
public long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
if (sequenceNo < 0 || sequenceNo > maxSequenceNo) {
throw new IllegalArgumentException(
"Don't support sequenceNo[" + sequenceNo + "], the max value should be " + maxSequenceNo);
}
if (partitionId < 0 || partitionId > maxPartitionId) {
throw new IllegalArgumentException(
"Don't support partitionId["
+ partitionId
+ "], the max value should be "
+ maxPartitionId);
}
if (taskAttemptId < 0 || taskAttemptId > maxTaskAttemptId) {
throw new IllegalArgumentException(
"Don't support taskAttemptId["
+ taskAttemptId
+ "], the max value should be "
+ maxTaskAttemptId);
}
return (long) sequenceNo << sequenceNoOffset
| (long) partitionId << partitionIdOffset
| taskAttemptId << taskAttemptIdOffset;
}
public int getSequenceNo(long blockId) {
return (int) ((blockId & sequenceNoMask) >> sequenceNoOffset);
}
public int getPartitionId(long blockId) {
return (int) ((blockId & partitionIdMask) >> partitionIdOffset);
}
public int getTaskAttemptId(long blockId) {
return (int) ((blockId & taskAttemptIdMask) >> taskAttemptIdOffset);
}
public BlockId asBlockId(long blockId) {
return new BlockId(
blockId, this, getSequenceNo(blockId), getPartitionId(blockId), getTaskAttemptId(blockId));
}
public BlockId asBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
return new BlockId(
getBlockId(sequenceNo, partitionId, taskAttemptId),
this,
sequenceNo,
partitionId,
(int) taskAttemptId);
}
public static BlockIdLayout from(RssConf rssConf) {
int sequenceBits = rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS);
int partitionBits = rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS);
int attemptBits = rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS);
return BlockIdLayout.from(sequenceBits, partitionBits, attemptBits);
}
public static BlockIdLayout from(int sequenceNoBits, int partitionIdBits, int taskAttemptIdBits) {
return new BlockIdLayout(sequenceNoBits, partitionIdBits, taskAttemptIdBits);
}
}