/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.aws2.dynamodb;

import java.io.Serializable;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Rule;
import org.testcontainers.containers.GenericContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

/** A utility to generate test table and data for {@link DynamoDBIOTest}. */
class DynamoDBIOTestHelper implements Serializable {

  @Rule
  public static GenericContainer dynamoContainer =
      new GenericContainer<>("amazon/dynamodb-local:latest").withExposedPorts(8000);

  private static DynamoDbClient dynamoDBClient;

  static final String ATTR_NAME_1 = "hashKey1";
  static final String ATTR_NAME_2 = "rangeKey2";

  static void startServerClient() {
    dynamoContainer.start();

    if (dynamoDBClient == null) {
      dynamoDBClient = getDynamoDBClient();
    }
  }

  static void stopServerClient(String tableName) {
    if (dynamoDBClient != null) {
      dynamoDBClient.close();
    }
    dynamoContainer.stop();
  }

  static DynamoDbClient getDynamoDBClient() {
    // Note: each test case got to have their own dynamodb client obj, can't be shared
    // Otherwise will run into connection pool issue
    return DynamoDbClient.builder()
        .endpointOverride(URI.create(getContainerEndpoint()))
        .region(Region.US_EAST_1)
        .credentialsProvider(
            StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKey", "secretKey")))
        .build();
  }

  static List<Map<String, AttributeValue>> generateTestData(String tableName, int numOfItems) {
    BatchWriteItemRequest batchWriteItemRequest =
        generateBatchWriteItemRequest(tableName, numOfItems);

    dynamoDBClient.batchWriteItem(batchWriteItemRequest);
    ScanResponse scanResult =
        dynamoDBClient.scan(ScanRequest.builder().tableName(tableName).build());

    List<Map<String, AttributeValue>> items = scanResult.items();
    Assert.assertEquals(numOfItems, items.size());
    return items;
  }

  static BatchWriteItemRequest generateBatchWriteItemRequest(String tableName, int numOfItems) {
    BatchWriteItemRequest batchWriteItemRequest =
        BatchWriteItemRequest.builder()
            .requestItems(ImmutableMap.of(tableName, generateWriteRequests(numOfItems)))
            .build();
    return batchWriteItemRequest;
  }

  static List<WriteRequest> generateWriteRequests(int numOfItem) {
    List<WriteRequest> writeRequests = new ArrayList<>();
    for (int i = 1; i <= numOfItem; i++) {
      WriteRequest writeRequest =
          WriteRequest.builder()
              .putRequest(generatePutRequest("hashKeyDataStr_" + i, "1000" + i))
              .build();
      writeRequests.add(writeRequest);
    }
    return writeRequests;
  }

  private static PutRequest generatePutRequest(String hashKeyData, String rangeKeyData) {
    ImmutableMap<String, AttributeValue> attrValueMap =
        ImmutableMap.of(
            ATTR_NAME_1, AttributeValue.builder().s(hashKeyData).build(),
            ATTR_NAME_2, AttributeValue.builder().n(rangeKeyData).build());

    PutRequest.Builder putRequestBuilder = PutRequest.builder();
    putRequestBuilder.item(attrValueMap);
    return putRequestBuilder.build();
  }

  static List<Map<String, AttributeValue>> readDataFromTable(String tableName) {
    ScanRequest scanRequest = ScanRequest.builder().tableName(tableName).build();
    ScanResponse scanResponse = dynamoDBClient.scan(scanRequest);
    return scanResponse.items();
  }

  static void deleteTestTable(String tableName) {
    DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
    dynamoDBClient.deleteTable(request);
  }

  static void createTestTable(String tableName) {
    CreateTableResponse res = createDynamoTable(tableName);

    TableDescription tableDesc = res.tableDescription();

    Assert.assertEquals(tableName, tableDesc.tableName());
    Assert.assertTrue(tableDesc.keySchema().toString().contains(ATTR_NAME_1));
    Assert.assertTrue(tableDesc.keySchema().toString().contains(ATTR_NAME_2));

    Assert.assertEquals(tableDesc.provisionedThroughput().readCapacityUnits(), Long.valueOf(1000));
    Assert.assertEquals(tableDesc.provisionedThroughput().writeCapacityUnits(), Long.valueOf(1000));
    Assert.assertEquals(TableStatus.ACTIVE, tableDesc.tableStatus());
    Assert.assertEquals(
        "arn:aws:dynamodb:ddblocal:000000000000:table/" + tableName, tableDesc.tableArn());

    ListTablesResponse tables = dynamoDBClient.listTables();
    Assert.assertEquals(1, tables.tableNames().size());
  }

  private static CreateTableResponse createDynamoTable(String tableName) {

    ImmutableList<AttributeDefinition> attributeDefinitions =
        ImmutableList.of(
            AttributeDefinition.builder()
                .attributeName(ATTR_NAME_1)
                .attributeType(ScalarAttributeType.S)
                .build(),
            AttributeDefinition.builder()
                .attributeName(ATTR_NAME_2)
                .attributeType(ScalarAttributeType.N)
                .build());

    ImmutableList<KeySchemaElement> ks =
        ImmutableList.of(
            KeySchemaElement.builder().attributeName(ATTR_NAME_1).keyType(KeyType.HASH).build(),
            KeySchemaElement.builder().attributeName(ATTR_NAME_2).keyType(KeyType.RANGE).build());

    ProvisionedThroughput provisionedthroughput =
        ProvisionedThroughput.builder().readCapacityUnits(1000L).writeCapacityUnits(1000L).build();
    CreateTableRequest request =
        CreateTableRequest.builder()
            .tableName(tableName)
            .attributeDefinitions(attributeDefinitions)
            .keySchema(ks)
            .provisionedThroughput(provisionedthroughput)
            .build();

    return dynamoDBClient.createTable(request);
  }

  // This helper function is copied from localstack
  private static String getContainerEndpoint() {
    final String address = dynamoContainer.getContainerIpAddress();
    String ipAddress = address;
    try {
      ipAddress = InetAddress.getByName(address).getHostAddress();
    } catch (UnknownHostException ignored) {
    }
    ipAddress = ipAddress + ".nip.io";
    while (true) {
      try {
        //noinspection ResultOfMethodCallIgnored
        InetAddress.getAllByName(ipAddress);
        break;
      } catch (UnknownHostException ignored) {
      }
    }
    return "http://" + ipAddress + ":" + dynamoContainer.getFirstMappedPort();
  }
}
