blob: 546f56fe0f2f448a31318a2669739e7b33a7579e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.aws2.dynamodb;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Rule;
import org.testcontainers.containers.GenericContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
/** A utility to generate test table and data for {@link DynamoDBIOTest}. */
class DynamoDBIOTestHelper implements Serializable {
@Rule
public static GenericContainer dynamoContainer =
new GenericContainer<>("amazon/dynamodb-local:latest").withExposedPorts(8000);
private static DynamoDbClient dynamoDBClient;
static final String ATTR_NAME_1 = "hashKey1";
static final String ATTR_NAME_2 = "rangeKey2";
static void startServerClient() {
dynamoContainer.start();
if (dynamoDBClient == null) {
dynamoDBClient = getDynamoDBClient();
}
}
static void stopServerClient(String tableName) {
if (dynamoDBClient != null) {
dynamoDBClient.close();
}
dynamoContainer.stop();
}
static DynamoDbClient getDynamoDBClient() {
// Note: each test case got to have their own dynamodb client obj, can't be shared
// Otherwise will run into connection pool issue
return DynamoDbClient.builder()
.endpointOverride(URI.create(getContainerEndpoint()))
.region(Region.US_EAST_1)
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKey", "secretKey")))
.build();
}
static List<Map<String, AttributeValue>> generateTestData(String tableName, int numOfItems) {
BatchWriteItemRequest batchWriteItemRequest =
generateBatchWriteItemRequest(tableName, numOfItems);
dynamoDBClient.batchWriteItem(batchWriteItemRequest);
ScanResponse scanResult =
dynamoDBClient.scan(ScanRequest.builder().tableName(tableName).build());
List<Map<String, AttributeValue>> items = scanResult.items();
Assert.assertEquals(numOfItems, items.size());
return items;
}
static BatchWriteItemRequest generateBatchWriteItemRequest(String tableName, int numOfItems) {
BatchWriteItemRequest batchWriteItemRequest =
BatchWriteItemRequest.builder()
.requestItems(ImmutableMap.of(tableName, generateWriteRequests(numOfItems)))
.build();
return batchWriteItemRequest;
}
static List<WriteRequest> generateWriteRequests(int numOfItem) {
List<WriteRequest> writeRequests = new ArrayList<>();
for (int i = 1; i <= numOfItem; i++) {
WriteRequest writeRequest =
WriteRequest.builder()
.putRequest(generatePutRequest("hashKeyDataStr_" + i, "1000" + i))
.build();
writeRequests.add(writeRequest);
}
return writeRequests;
}
private static PutRequest generatePutRequest(String hashKeyData, String rangeKeyData) {
ImmutableMap<String, AttributeValue> attrValueMap =
ImmutableMap.of(
ATTR_NAME_1, AttributeValue.builder().s(hashKeyData).build(),
ATTR_NAME_2, AttributeValue.builder().n(rangeKeyData).build());
PutRequest.Builder putRequestBuilder = PutRequest.builder();
putRequestBuilder.item(attrValueMap);
return putRequestBuilder.build();
}
static List<Map<String, AttributeValue>> readDataFromTable(String tableName) {
ScanRequest scanRequest = ScanRequest.builder().tableName(tableName).build();
ScanResponse scanResponse = dynamoDBClient.scan(scanRequest);
return scanResponse.items();
}
static void deleteTestTable(String tableName) {
DeleteTableRequest request = DeleteTableRequest.builder().tableName(tableName).build();
dynamoDBClient.deleteTable(request);
}
static void createTestTable(String tableName) {
CreateTableResponse res = createDynamoTable(tableName);
TableDescription tableDesc = res.tableDescription();
Assert.assertEquals(tableName, tableDesc.tableName());
Assert.assertTrue(tableDesc.keySchema().toString().contains(ATTR_NAME_1));
Assert.assertTrue(tableDesc.keySchema().toString().contains(ATTR_NAME_2));
Assert.assertEquals(tableDesc.provisionedThroughput().readCapacityUnits(), Long.valueOf(1000));
Assert.assertEquals(tableDesc.provisionedThroughput().writeCapacityUnits(), Long.valueOf(1000));
Assert.assertEquals(TableStatus.ACTIVE, tableDesc.tableStatus());
Assert.assertEquals(
"arn:aws:dynamodb:ddblocal:000000000000:table/" + tableName, tableDesc.tableArn());
ListTablesResponse tables = dynamoDBClient.listTables();
Assert.assertEquals(1, tables.tableNames().size());
}
private static CreateTableResponse createDynamoTable(String tableName) {
ImmutableList<AttributeDefinition> attributeDefinitions =
ImmutableList.of(
AttributeDefinition.builder()
.attributeName(ATTR_NAME_1)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(ATTR_NAME_2)
.attributeType(ScalarAttributeType.N)
.build());
ImmutableList<KeySchemaElement> ks =
ImmutableList.of(
KeySchemaElement.builder().attributeName(ATTR_NAME_1).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(ATTR_NAME_2).keyType(KeyType.RANGE).build());
ProvisionedThroughput provisionedthroughput =
ProvisionedThroughput.builder().readCapacityUnits(1000L).writeCapacityUnits(1000L).build();
CreateTableRequest request =
CreateTableRequest.builder()
.tableName(tableName)
.attributeDefinitions(attributeDefinitions)
.keySchema(ks)
.provisionedThroughput(provisionedthroughput)
.build();
return dynamoDBClient.createTable(request);
}
// This helper function is copied from localstack
private static String getContainerEndpoint() {
final String address = dynamoContainer.getContainerIpAddress();
String ipAddress = address;
try {
ipAddress = InetAddress.getByName(address).getHostAddress();
} catch (UnknownHostException ignored) {
}
ipAddress = ipAddress + ".nip.io";
while (true) {
try {
//noinspection ResultOfMethodCallIgnored
InetAddress.getAllByName(ipAddress);
break;
} catch (UnknownHostException ignored) {
}
}
return "http://" + ipAddress + ":" + dynamoContainer.getFirstMappedPort();
}
}