blob: 8128a17185e51457f413823ba22c30b87e1d6f47 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.datatorrent.contrib.aerospike;
import javax.annotation.Nonnull;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.query.Filter;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.aerospike.client.task.IndexTask;
import com.datatorrent.lib.db.TransactionableStore;
* <p>Provides transaction support to the operators by implementing TransactionableStore abstract methods. </p>
* @displayName Aerospike Transactional Store
* @category Output
* @tags store, transactional
* @since 1.0.4
public class AerospikeTransactionalStore extends AerospikeStore implements TransactionableStore {
public static String DEFAULT_APP_ID_COL = "dt_app_id";
public static String DEFAULT_OPERATOR_ID_COL = "dt_operator_id";
public static String DEFAULT_WINDOW_COL = "dt_window";
public static String DEFAULT_META_SET = "dt_meta";
protected String metaTableAppIdColumn;
protected String metaTableOperatorIdColumn;
protected String metaTableWindowColumn;
protected String metaSet;
protected String namespace;
private transient boolean inTransaction;
private transient Statement lastWindowFetchCommand;
public AerospikeTransactionalStore() {
metaTableAppIdColumn = DEFAULT_APP_ID_COL;
metaTableOperatorIdColumn = DEFAULT_OPERATOR_ID_COL;
metaTableWindowColumn = DEFAULT_WINDOW_COL;
inTransaction = false;
* Sets the name of the meta set.<br/>
* <b>Default:</b> {@value #DEFAULT_META_SET}
* @param metaSet meta set name.
public void setMetaSet(@Nonnull String metaSet) {
this.metaSet = metaSet;
* Sets the name of app id column.<br/>
* <b>Default:</b> {@value #DEFAULT_APP_ID_COL}
* @param appIdColumn application id column name.
public void setMetaTableAppIdColumn(@Nonnull String appIdColumn) {
this.metaTableAppIdColumn = appIdColumn;
* Sets the name of operator id column.<br/>
* <b>Default:</b> {@value #DEFAULT_OPERATOR_ID_COL}
* @param operatorIdColumn operator id column name.
public void setMetaTableOperatorIdColumn(@Nonnull String operatorIdColumn) {
this.metaTableOperatorIdColumn = operatorIdColumn;
* Sets the name of the window column.<br/>
* <b>Default:</b> {@value #DEFAULT_WINDOW_COL}
* @param windowColumn window column name.
public void setMetaTableWindowColumn(@Nonnull String windowColumn) {
this.metaTableWindowColumn = windowColumn;
* Sets the name of the namespace.<br/>
* @param namespace namespace.
public void setNamespace(@Nonnull String namespace) {
this.namespace = namespace;
public void connect() {
try {
lastWindowFetchCommand = new Statement();
catch (Exception e) {
throw new RuntimeException(e);
public void disconnect() {
public void beginTransaction() {
inTransaction = true;
public void commitTransaction() {
inTransaction = false;
public void rollbackTransaction() {
inTransaction = false;
public boolean isInTransaction() {
return inTransaction;
private void createIndexes() {
IndexTask task;
try {
task = client.createIndex(null, namespace, metaSet,
"operatorIdIndex", metaTableOperatorIdColumn, IndexType.NUMERIC);
task = client.createIndex(null, namespace, metaSet,
"appIdIndex", metaTableAppIdColumn, IndexType.STRING);
} catch (AerospikeException ex) {
throw new RuntimeException(ex);
public long getCommittedWindowId(String appId, int operatorId) {
try {
lastWindowFetchCommand.setFilters(Filter.equal(metaTableOperatorIdColumn, operatorId));
lastWindowFetchCommand.setFilters(Filter.equal(metaTableAppIdColumn, appId));
long lastWindow = -1;
RecordSet recordSet = client.query(null, lastWindowFetchCommand);
while( {
lastWindow = Long.parseLong(recordSet.getRecord().getValue(metaTableWindowColumn).toString());
return lastWindow;
catch (AerospikeException ex) {
throw new RuntimeException(ex);
public void storeCommittedWindowId(String appId, int operatorId, long windowId) {
try {
String keyString = appId + String.valueOf(operatorId);
Key key = new Key(namespace,metaSet,keyString.hashCode());
Bin bin1 = new Bin(metaTableAppIdColumn,appId);
Bin bin2 = new Bin(metaTableOperatorIdColumn,operatorId);
Bin bin3 = new Bin(metaTableWindowColumn,windowId);
client.put(null, key, bin1,bin2,bin3);
catch (AerospikeException e) {
throw new RuntimeException(e);
public void removeCommittedWindowId(String appId, int operatorId) {
try {
String keyString = appId + String.valueOf(operatorId);
Key key = new Key(namespace,metaSet,keyString.hashCode());
client.delete(null, key);
catch (AerospikeException e) {
throw new RuntimeException(e);