blob: 26ad442f0ea75486472a2ca2b2ca01ea9e750fef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Collects results and returns them as table snapshots.
*
* @param <C> cluster id to which this result belongs to
*/
public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {
/** Maximum initial capacity of the materialized table. */
public static final int MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY = 1_000_000;
/** Maximum overcommitment of the materialized table. */
public static final int MATERIALIZED_TABLE_MAX_OVERCOMMIT = 1_000_000;
/** Factor for the initial capacity of the materialized table. */
public static final double MATERIALIZED_TABLE_CAPACITY_FACTOR = 0.05;
/** Factor for cleaning up deleted rows in the materialized table. */
public static final double MATERIALIZED_TABLE_OVERCOMMIT_FACTOR = 0.01;
/**
* Maximum number of materialized rows to be stored. After the count is reached, oldest
* rows are dropped.
*/
private final int maxRowCount;
/** Threshold for cleaning up deleted rows in the materialized table. */
private final int overcommitThreshold;
/**
* Materialized table that is continuously updated by inserts and deletes. Deletes at
* the beginning are lazily cleaned up when the threshold is reached.
*/
private final List<Row> materializedTable;
/**
* Caches the last row position for faster access. The position might not be exact (if rows
* with smaller position are deleted) nor complete (for deletes of duplicates). However, the
* cache narrows the search in the materialized table.
*/
private final Map<Row, Integer> rowPositionCache;
/** Current snapshot of the materialized table. */
private final List<Row> snapshot;
/** Counter for deleted rows to be deleted at the beginning of the materialized table. */
private int validRowPosition;
/** Page count of the snapshot (always >= 1). */
private int pageCount;
/** Page size of the snapshot (always >= 1). */
private int pageSize;
/** Indicator that this is the last snapshot possible (EOS afterwards). */
private boolean isLastSnapshot;
@VisibleForTesting
public MaterializedCollectStreamResult(
DataType outputType,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
int maxRowCount,
int overcommitThreshold) {
super(outputType, config, gatewayAddress, gatewayPort);
if (maxRowCount <= 0) {
this.maxRowCount = Integer.MAX_VALUE;
} else {
this.maxRowCount = maxRowCount;
}
this.overcommitThreshold = overcommitThreshold;
// prepare for materialization
final int initialCapacity = computeMaterializedTableCapacity(maxRowCount); // avoid frequent resizing
materializedTable = new ArrayList<>(initialCapacity);
rowPositionCache = new HashMap<>(initialCapacity);
snapshot = new ArrayList<>();
validRowPosition = 0;
isLastSnapshot = false;
pageCount = 0;
}
public MaterializedCollectStreamResult(
DataType outputType,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
int maxRowCount) {
this(
outputType,
config,
gatewayAddress,
gatewayPort,
maxRowCount,
computeMaterializedTableOvercommit(maxRowCount));
}
@Override
public boolean isMaterialized() {
return true;
}
@Override
public TypedResult<Integer> snapshot(int pageSize) {
if (pageSize < 1) {
throw new SqlExecutionException("Page size must be greater than 0.");
}
synchronized (resultLock) {
// retrieval thread is dead and there are no results anymore
// or program failed
if ((!isRetrieving() && isLastSnapshot) || executionException != null) {
return handleMissingResult();
}
// this snapshot is the last result that can be delivered
else if (!isRetrieving()) {
isLastSnapshot = true;
}
this.pageSize = pageSize;
snapshot.clear();
for (int i = validRowPosition; i < materializedTable.size(); i++) {
snapshot.add(materializedTable.get(i));
}
// at least one page
pageCount = Math.max(1, (int) Math.ceil(((double) snapshot.size() / pageSize)));
return TypedResult.payload(pageCount);
}
}
@Override
public List<Row> retrievePage(int page) {
synchronized (resultLock) {
if (page <= 0 || page > pageCount) {
throw new SqlExecutionException("Invalid page '" + page + "'.");
}
return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page));
}
}
// --------------------------------------------------------------------------------------------
@Override
protected void processRecord(Tuple2<Boolean, Row> change) {
synchronized (resultLock) {
// insert
if (change.f0) {
processInsert(change.f1);
}
// delete
else {
processDelete(change.f1);
}
}
}
@VisibleForTesting
protected List<Row> getMaterializedTable() {
return materializedTable;
}
// --------------------------------------------------------------------------------------------
private void processInsert(Row row) {
// limit the materialized table
if (materializedTable.size() - validRowPosition >= maxRowCount) {
cleanUp();
}
materializedTable.add(row);
rowPositionCache.put(row, materializedTable.size() - 1);
}
private void processDelete(Row row) {
// delete the newest record first to minimize per-page changes
final Integer cachedPos = rowPositionCache.get(row);
final int startSearchPos;
if (cachedPos != null) {
startSearchPos = Math.min(cachedPos, materializedTable.size() - 1);
} else {
startSearchPos = materializedTable.size() - 1;
}
for (int i = startSearchPos; i >= validRowPosition; i--) {
if (materializedTable.get(i).equals(row)) {
materializedTable.remove(i);
rowPositionCache.remove(row);
break;
}
}
}
private void cleanUp() {
// invalidate row
final Row deleteRow = materializedTable.get(validRowPosition);
if (rowPositionCache.get(deleteRow) == validRowPosition) {
// this row has no duplicates in the materialized table,
// it can be removed from the cache
rowPositionCache.remove(deleteRow);
}
materializedTable.set(validRowPosition, null);
validRowPosition++;
// perform clean up in batches
if (validRowPosition >= overcommitThreshold) {
materializedTable.subList(0, validRowPosition).clear();
// adjust all cached indexes
rowPositionCache.replaceAll((k, v) -> v - validRowPosition);
validRowPosition = 0;
}
}
private static int computeMaterializedTableCapacity(int maxRowCount) {
return Math.min(
MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY,
Math.max(1, (int) (maxRowCount * MATERIALIZED_TABLE_CAPACITY_FACTOR)));
}
private static int computeMaterializedTableOvercommit(int maxRowCount) {
return Math.min(
MATERIALIZED_TABLE_MAX_OVERCOMMIT,
(int) (maxRowCount * MATERIALIZED_TABLE_OVERCOMMIT_FACTOR));
}
}