blob: 70a911f0457bc4aa153c31d33d5dc76398a05bff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.window.grouping;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.RowIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
/**
* A jvm heap implementation of {@link AbstractWindowsGrouping}, which uses a linked list to buffer
* all the inputs of a keyed group belonging to the same window.
* It is designed to have a capacity limit to avoid JVM OOM and reduce GC pauses.
*/
public class HeapWindowsGrouping extends AbstractWindowsGrouping {
private LinkedList<BinaryRow> buffer;
private final int limit;
private int evictLimitIndex;
private Iterator<BinaryRow> iterator;
public HeapWindowsGrouping(int limit, long windowSize, long slideSize, int timeIndex, boolean isDate) {
this(limit, 0L, windowSize, slideSize, timeIndex, isDate);
}
public HeapWindowsGrouping(
int limit, long offset, long windowSize, long slideSize, int timeIndex, boolean isDate) {
super(offset, windowSize, slideSize, timeIndex, isDate);
this.limit = limit;
this.evictLimitIndex = 0;
}
@Override
protected void createBuffer() {
buffer = new LinkedList<>();
}
@Override
protected void resetBuffer() {
buffer.clear();
evictLimitIndex = 0;
iterator = null;
}
@Override
protected void onBufferEvict(int limitIndex) {
while (evictLimitIndex < limitIndex) {
buffer.removeFirst();
evictLimitIndex++;
}
}
@Override
protected void addIntoBuffer(BinaryRow input) throws IOException {
if (buffer.size() >= limit) {
throw new IOException("HeapWindowsGrouping out of memory, element size limit " + limit);
}
buffer.add(input);
}
@Override
protected RowIterator<BinaryRow> newBufferIterator(int startIndex) {
iterator = buffer.subList(startIndex - evictLimitIndex, buffer.size()).iterator();
return new BufferIterator(iterator);
}
private final class BufferIterator implements RowIterator<BinaryRow> {
private final Iterator<BinaryRow> iterator;
private BinaryRow next;
BufferIterator(Iterator<BinaryRow> iterator) {
this.iterator = iterator;
}
@Override
public boolean advanceNext() {
if (iterator.hasNext()) {
next = iterator.next();
return true;
} else {
next = null;
return false;
}
}
@Override
public BinaryRow getRow() {
return next;
}
}
}