blob: 949856299b8851a1ee995776fc26765126059b2b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.aerospike;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import java.util.ArrayList;
import java.util.List;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Utility class encapsulating code used by several tests
*/
public class AerospikeTestUtils {
private static final Logger LOG = LoggerFactory.getLogger(AerospikeTestUtils.class);
public static final String NODE = "127.0.0.1";
public static final String NAMESPACE = "test";
public static final int PORT = 3000;
public static final String SET_NAME = "test_event_set";
public static final int OPERATOR_ID = 0;
public static final int NUM_TUPLES = 10;
// removes all records from set SET_NAME in namespace NAMESPACE
static void cleanTable() {
AerospikeClient client = null;
try {
client = new AerospikeClient(NODE, PORT);
Statement stmnt = new Statement();
stmnt.setNamespace(NAMESPACE);
stmnt.setSetName(SET_NAME);
RecordSet rs = client.query(null, stmnt);
while(rs.next()){
client.delete(null, rs.getKey());
}
}
catch (AerospikeException e) {
LOG.error("cleanTable failed: {}", e);
throw e;
}
finally {
if (null != client) client.close();
}
}
// removes all records from set AerospikeTransactionalStore.DEFAULT_META_SET (used to store
// committed window ids) in namespace NAMESPACE
//
static void cleanMetaTable() {
AerospikeClient client = null;
try {
client = new AerospikeClient(NODE, PORT);
Statement stmnt = new Statement();
stmnt.setNamespace(NAMESPACE);
stmnt.setSetName(AerospikeTransactionalStore.DEFAULT_META_SET);
RecordSet rs = client.query(null, stmnt);
while(rs.next()){
client.delete(null, rs.getKey());
}
}
catch (AerospikeException e) {
LOG.error("cleanMetaTable failed: {}", e);
throw e;
}
finally {
if (null != client) client.close();
}
}
// returns the number of records in set SET_NAME in namespace NAMESPACE
static long getNumOfEventsInStore() {
AerospikeClient client = null;
try {
long count = 0;
client = new AerospikeClient(NODE, PORT);
Statement stmnt = new Statement();
stmnt.setNamespace(NAMESPACE);
stmnt.setSetName(SET_NAME);
RecordSet rs = client.query(null, stmnt);
while(rs.next()){
count++;
}
return count;
}
catch (AerospikeException e) {
LOG.error("getNumOfEventsInStore failed: {}", e);
throw e;
}
finally {
if (null != client) client.close();
}
}
static AerospikeStore getStore()
{
AerospikeStore result = new AerospikeStore();
result.setNode(NODE);
result.setPort(PORT);
//result.setNamespace(NAMESPACE); // add if needed
return result;
}
static AerospikeTransactionalStore getTransactionalStore()
{
AerospikeTransactionalStore result = new AerospikeTransactionalStore();
result.setNode(NODE);
result.setPort(PORT);
result.setNamespace(NAMESPACE); // used by AerospikeTransactionalStore.createIndexes()
return result;
}
static OperatorContext getOperatorContext(final String app_id)
{
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, app_id);
return mockOperatorContext(OPERATOR_ID, attributeMap);
}
static ArrayList<String> getExpressions()
{
ArrayList<String> result = new ArrayList<String>();
result.add("getKey()");
result.add("getBins()");
return result;
}
static List<TestPOJO> getEvents()
{
List<TestPOJO> result = new ArrayList<TestPOJO>();
for (int i = 0; i < NUM_TUPLES; i++) {
result.add(new TestPOJO(i));
}
return result;
}
static boolean checkEvents()
{
long count = 0;
AerospikeClient client = null;
try {
client = new AerospikeClient(NODE, PORT);
Statement stmnt = new Statement();
stmnt.setNamespace(NAMESPACE);
stmnt.setSetName(SET_NAME);
RecordSet rs = client.query(null, stmnt);
while ( rs.next() ) {
Record record = rs.getRecord();
Key key = rs.getKey();
if (! TestPOJO.check(key, record)) return false;
count++;
}
}
catch (AerospikeException e) {
throw new RuntimeException("Error fetching records: ", e);
}
finally {
if (null != client) client.close();
}
return NUM_TUPLES == count;
}
// needs to be public for PojoUtils to work
public static class TestPOJO {
public static final String ID = "ID", VAL = "VALUE";
final int id, value;
TestPOJO(int i) {
id = i; value = id * id;
}
public Key getKey() {
try {
Key key = new Key(NAMESPACE, SET_NAME, String.valueOf(id));
return key;
}
catch (AerospikeException e) {
throw new RuntimeException("getKey failed: ", e);
}
}
public List<Bin> getBins() {
List<Bin> list = new ArrayList<Bin>();
list.add(new Bin(ID, id));
list.add(new Bin(VAL, value));
return list;
}
// check record key and values
public static boolean check(Key key, Record record) {
final int binId = record.getInt(ID), binVal = record.getInt(VAL);
final Key rKey = new Key(NAMESPACE, SET_NAME, String.valueOf(binId));
LOG.debug("Checking id = {}", binId);
return binVal == binId * binId && rKey.equals(key);
}
}
}