blob: bb6451461c731172ed0c64d4f164db04dedd4844 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.runtime.state.heap.SkipListUtils.DEFAULT_LEVEL;
import static org.apache.flink.runtime.state.heap.SkipListUtils.MAX_LEVEL;
import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
/** Implementation of {@link LevelIndexHeader} which stores index on heap. */
public class OnHeapLevelIndexHeader implements LevelIndexHeader {
/** The level index array where each position stores the next node id of the level. */
private volatile long[] levelIndex;
/** The topmost level currently. */
private volatile int topLevel;
/** Next node at level 0. */
private volatile long nextNode;
OnHeapLevelIndexHeader() {
this(DEFAULT_LEVEL);
}
private OnHeapLevelIndexHeader(int maxLevel) {
Preconditions.checkArgument(
maxLevel >= 1 && maxLevel <= MAX_LEVEL,
"maxLevel(" + maxLevel + ") must be non-negative and no more than " + MAX_LEVEL);
this.topLevel = 1;
this.nextNode = NIL_NODE;
this.levelIndex = new long[maxLevel];
initLevelIndex(levelIndex);
}
private void initLevelIndex(long[] levelIndex) {
for (int i = 0; i < levelIndex.length; i++) {
levelIndex[i] = NIL_NODE;
}
}
@Override
public int getLevel() {
return topLevel;
}
@Override
public void updateLevel(int level) {
Preconditions.checkArgument(
level >= 0 && level <= MAX_LEVEL,
"level(" + level + ") must be non-negative and no more than " + MAX_LEVEL);
Preconditions.checkArgument(
level <= this.topLevel + 1,
"top level "
+ topLevel
+ " must be updated level by level, but new level is "
+ level);
if (levelIndex.length < level) {
long[] newLevelIndex = new long[this.levelIndex.length * 2];
initLevelIndex(newLevelIndex);
System.arraycopy(this.levelIndex, 0, newLevelIndex, 0, this.levelIndex.length);
this.levelIndex = newLevelIndex;
}
if (topLevel < level) {
topLevel = level;
}
}
@Override
public long getNextNode(int level) {
Preconditions.checkArgument(
level >= 0 && level <= topLevel,
"invalid level " + level + " current top level is " + topLevel);
if (level == 0) {
return nextNode;
}
return levelIndex[level - 1];
}
@Override
public void updateNextNode(int level, long node) {
Preconditions.checkArgument(
level >= 0 && level <= topLevel,
"invalid level " + level + " current top level is " + topLevel);
if (level == 0) {
nextNode = node;
} else {
levelIndex[level - 1] = node;
}
}
@VisibleForTesting
long[] getLevelIndex() {
return levelIndex;
}
}