/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.Internal;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.flink.table.descriptors.DescriptorProperties.noValidation;

/**
 * The validator for {@link Elasticsearch}.
 */
@Internal
public class ElasticsearchValidator extends ConnectorDescriptorValidator {

	public static final String CONNECTOR_TYPE_VALUE_ELASTICSEARCH = "elasticsearch";
	public static final String CONNECTOR_VERSION_VALUE_6 = "6";
	public static final String CONNECTOR_HOSTS = "connector.hosts";
	public static final String CONNECTOR_HOSTS_HOSTNAME = "hostname";
	public static final String CONNECTOR_HOSTS_PORT = "port";
	public static final String CONNECTOR_HOSTS_PROTOCOL = "protocol";
	public static final String CONNECTOR_INDEX = "connector.index";
	public static final String CONNECTOR_DOCUMENT_TYPE = "connector.document-type";
	public static final String CONNECTOR_KEY_DELIMITER = "connector.key-delimiter";
	public static final String CONNECTOR_KEY_NULL_LITERAL = "connector.key-null-literal";
	public static final String CONNECTOR_FAILURE_HANDLER = "connector.failure-handler";
	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_FAIL = "fail";
	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE = "ignore";
	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_RETRY = "retry-rejected";
	public static final String CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM = "custom";
	public static final String CONNECTOR_FAILURE_HANDLER_CLASS = "connector.failure-handler-class";
	public static final String CONNECTOR_FLUSH_ON_CHECKPOINT = "connector.flush-on-checkpoint";
	public static final String CONNECTOR_BULK_FLUSH_MAX_ACTIONS = "connector.bulk-flush.max-actions";
	public static final String CONNECTOR_BULK_FLUSH_MAX_SIZE = "connector.bulk-flush.max-size";
	public static final String CONNECTOR_BULK_FLUSH_INTERVAL = "connector.bulk-flush.interval";
	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE = "connector.bulk-flush.backoff.type";
	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_DISABLED = "disabled";
	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT = "constant";
	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL = "exponential";
	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES = "connector.bulk-flush.backoff.max-retries";
	public static final String CONNECTOR_BULK_FLUSH_BACKOFF_DELAY = "connector.bulk-flush.backoff.delay";
	public static final String CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT = "connector.connection-max-retry-timeout";
	public static final String CONNECTOR_CONNECTION_PATH_PREFIX = "connector.connection-path-prefix";

	@Override
	public void validate(DescriptorProperties properties) {
		super.validate(properties);
		properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_ELASTICSEARCH, false);
		validateVersion(properties);
		validateHosts(properties);
		validateGeneralProperties(properties);
		validateFailureHandler(properties);
		validateBulkFlush(properties);
		validateConnectionProperties(properties);
	}

	private void validateVersion(DescriptorProperties properties) {
		properties.validateEnumValues(
			CONNECTOR_VERSION,
			false,
			Collections.singletonList(CONNECTOR_VERSION_VALUE_6));
	}

	private void validateHosts(DescriptorProperties properties) {
		final Map<String, Consumer<String>> hostsValidators = new HashMap<>();
		hostsValidators.put(
			CONNECTOR_HOSTS_HOSTNAME,
			(key) -> properties.validateString(key, false, 1));
		hostsValidators.put(
			CONNECTOR_HOSTS_PORT,
			(key) -> properties.validateInt(key, false, 0, 65535));
		hostsValidators.put(CONNECTOR_HOSTS_PROTOCOL,
			(key) -> properties.validateString(key, false, 1));
		properties.validateFixedIndexedProperties(CONNECTOR_HOSTS, false, hostsValidators);
	}

	private void validateGeneralProperties(DescriptorProperties properties) {
		properties.validateString(CONNECTOR_INDEX, false, 1);
		properties.validateString(CONNECTOR_DOCUMENT_TYPE, false, 1);
		properties.validateString(CONNECTOR_KEY_DELIMITER, true);
		properties.validateString(CONNECTOR_KEY_NULL_LITERAL, true);
	}

	private void validateFailureHandler(DescriptorProperties properties) {
		final Map<String, Consumer<String>> failureHandlerValidators = new HashMap<>();
		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_FAIL, noValidation());
		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_IGNORE, noValidation());
		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_RETRY, noValidation());
		failureHandlerValidators.put(CONNECTOR_FAILURE_HANDLER_VALUE_CUSTOM,
			key -> properties.validateString(CONNECTOR_FAILURE_HANDLER_CLASS, false, 1));
		properties.validateEnum(CONNECTOR_FAILURE_HANDLER, true, failureHandlerValidators);
	}

	private void validateBulkFlush(DescriptorProperties properties) {
		properties.validateBoolean(CONNECTOR_FLUSH_ON_CHECKPOINT, true);
		properties.validateInt(CONNECTOR_BULK_FLUSH_MAX_ACTIONS, true, 1);
		properties.validateMemorySize(CONNECTOR_BULK_FLUSH_MAX_SIZE, true, 1024 * 1024); // only allow MB precision
		properties.validateLong(CONNECTOR_BULK_FLUSH_INTERVAL, true, 0);
		properties.validateEnumValues(CONNECTOR_BULK_FLUSH_BACKOFF_TYPE,
			true,
			Arrays.asList(
				CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_DISABLED,
				CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_CONSTANT,
				CONNECTOR_BULK_FLUSH_BACKOFF_TYPE_VALUE_EXPONENTIAL));
		properties.validateInt(CONNECTOR_BULK_FLUSH_BACKOFF_MAX_RETRIES, true, 1);
		properties.validateLong(CONNECTOR_BULK_FLUSH_BACKOFF_DELAY, true, 0);
	}

	private void validateConnectionProperties(DescriptorProperties properties) {
		properties.validateInt(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1);
		properties.validateString(CONNECTOR_CONNECTION_PATH_PREFIX, true);
	}
}
