/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.client.gateway.local.result;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Collects results and returns them as table snapshots.
 *
 * @param <C> cluster id to which this result belongs to
 */
public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {

	/** Maximum initial capacity of the materialized table. */
	public static final int MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY = 1_000_000;

	/** Maximum overcommitment of the materialized table. */
	public static final int MATERIALIZED_TABLE_MAX_OVERCOMMIT = 1_000_000;

	/** Factor for the initial capacity of the materialized table. */
	public static final double MATERIALIZED_TABLE_CAPACITY_FACTOR = 0.05;

	/** Factor for cleaning up deleted rows in the materialized table. */
	public static final double MATERIALIZED_TABLE_OVERCOMMIT_FACTOR = 0.01;

	/**
	 * Maximum number of materialized rows to be stored. After the count is reached, oldest
	 * rows are dropped.
	 */
	private final int maxRowCount;

	/** Threshold for cleaning up deleted rows in the materialized table. */
	private final int overcommitThreshold;

	/**
	 * Materialized table that is continuously updated by inserts and deletes. Deletes at
	 * the beginning are lazily cleaned up when the threshold is reached.
	 */
	private final List<Row> materializedTable;

	/**
	 * Caches the last row position for faster access. The position might not be exact (if rows
	 * with smaller position are deleted) nor complete (for deletes of duplicates). However, the
	 * cache narrows the search in the materialized table.
	 */
	private final Map<Row, Integer> rowPositionCache;

	/** Current snapshot of the materialized table. */
	private final List<Row> snapshot;

	/** Counter for deleted rows to be deleted at the beginning of the materialized table. */
	private int validRowPosition;

	/** Page count of the snapshot (always >= 1). */
	private int pageCount;

	/** Page size of the snapshot (always >= 1). */
	private int pageSize;

	/** Indicator that this is the last snapshot possible (EOS afterwards). */
	private boolean isLastSnapshot;

	@VisibleForTesting
	public MaterializedCollectStreamResult(
			DataType outputType,
			ExecutionConfig config,
			InetAddress gatewayAddress,
			int gatewayPort,
			int maxRowCount,
			int overcommitThreshold) {
		super(outputType, config, gatewayAddress, gatewayPort);

		if (maxRowCount <= 0) {
			this.maxRowCount = Integer.MAX_VALUE;
		} else {
			this.maxRowCount = maxRowCount;
		}

		this.overcommitThreshold = overcommitThreshold;

		// prepare for materialization
		final int initialCapacity = computeMaterializedTableCapacity(maxRowCount); // avoid frequent resizing
		materializedTable = new ArrayList<>(initialCapacity);
		rowPositionCache = new HashMap<>(initialCapacity);
		snapshot = new ArrayList<>();
		validRowPosition = 0;
		isLastSnapshot = false;
		pageCount = 0;
	}

	public MaterializedCollectStreamResult(
			DataType outputType,
			ExecutionConfig config,
			InetAddress gatewayAddress,
			int gatewayPort,
			int maxRowCount) {

		this(
			outputType,
			config,
			gatewayAddress,
			gatewayPort,
			maxRowCount,
			computeMaterializedTableOvercommit(maxRowCount));
	}

	@Override
	public boolean isMaterialized() {
		return true;
	}

	@Override
	public TypedResult<Integer> snapshot(int pageSize) {
		if (pageSize < 1) {
			throw new SqlExecutionException("Page size must be greater than 0.");
		}

		synchronized (resultLock) {
			// retrieval thread is dead and there are no results anymore
			// or program failed
			if ((!isRetrieving() && isLastSnapshot) || executionException != null) {
				return handleMissingResult();
			}
			// this snapshot is the last result that can be delivered
			else if (!isRetrieving()) {
				isLastSnapshot = true;
			}

			this.pageSize = pageSize;
			snapshot.clear();
			for (int i = validRowPosition; i < materializedTable.size(); i++) {
				snapshot.add(materializedTable.get(i));
			}

			// at least one page
			pageCount = Math.max(1, (int) Math.ceil(((double) snapshot.size() / pageSize)));

			return TypedResult.payload(pageCount);
		}
	}

	@Override
	public List<Row> retrievePage(int page) {
		synchronized (resultLock) {
			if (page <= 0 || page > pageCount) {
				throw new SqlExecutionException("Invalid page '" + page + "'.");
			}

			return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page));
		}
	}

	// --------------------------------------------------------------------------------------------

	@Override
	protected void processRecord(Tuple2<Boolean, Row> change) {
		synchronized (resultLock) {
			// insert
			if (change.f0) {
				processInsert(change.f1);
			}
			// delete
			else {
				processDelete(change.f1);
			}
		}
	}

	@VisibleForTesting
	protected List<Row> getMaterializedTable() {
		return materializedTable;
	}

	// --------------------------------------------------------------------------------------------

	private void processInsert(Row row) {
		// limit the materialized table
		if (materializedTable.size() - validRowPosition >= maxRowCount) {
			cleanUp();
		}
		materializedTable.add(row);
		rowPositionCache.put(row, materializedTable.size() - 1);
	}

	private void processDelete(Row row) {
		// delete the newest record first to minimize per-page changes
		final Integer cachedPos = rowPositionCache.get(row);
		final int startSearchPos;
		if (cachedPos != null) {
			startSearchPos = Math.min(cachedPos, materializedTable.size() - 1);
		} else {
			startSearchPos = materializedTable.size() - 1;
		}

		for (int i = startSearchPos; i >= validRowPosition; i--) {
			if (materializedTable.get(i).equals(row)) {
				materializedTable.remove(i);
				rowPositionCache.remove(row);
				break;
			}
		}
	}

	private void cleanUp() {
		// invalidate row
		final Row deleteRow = materializedTable.get(validRowPosition);
		if (rowPositionCache.get(deleteRow) == validRowPosition) {
			// this row has no duplicates in the materialized table,
			// it can be removed from the cache
			rowPositionCache.remove(deleteRow);
		}
		materializedTable.set(validRowPosition, null);

		validRowPosition++;

		// perform clean up in batches
		if (validRowPosition >= overcommitThreshold) {
			materializedTable.subList(0, validRowPosition).clear();
			// adjust all cached indexes
			rowPositionCache.replaceAll((k, v) -> v - validRowPosition);
			validRowPosition = 0;
		}
	}

	private static int computeMaterializedTableCapacity(int maxRowCount) {
		return Math.min(
			MATERIALIZED_TABLE_MAX_INITIAL_CAPACITY,
			Math.max(1, (int) (maxRowCount * MATERIALIZED_TABLE_CAPACITY_FACTOR)));
	}

	private static int computeMaterializedTableOvercommit(int maxRowCount) {
		return Math.min(
			MATERIALIZED_TABLE_MAX_OVERCOMMIT,
			(int) (maxRowCount * MATERIALIZED_TABLE_OVERCOMMIT_FACTOR));
	}
}
