Merge branch 'master' into results
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 0467f6e..db4fc2d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -19,7 +19,8 @@
package org.apache.asterix.bad;
public interface BADConstants {
- String SubscriptionId = "subscriptionId";
+ String ChannelSubscriptionId = "channelSubId";
+ String BrokerSubscriptionId = "brokerSubId";
String BrokerName = "BrokerName";
String ChannelName = "ChannelName";
String ProcedureName = "ProcedureName";
@@ -29,9 +30,11 @@
String ResultId = "resultId";
String ChannelExecutionTime = "channelExecutionTime";
String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+ String BrokerSubscriptionsType = "BrokerSubscriptionsType";
String ChannelResultsType = "ChannelResultsType";
String ResultsDatasetName = "ResultsDatasetName";
- String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+ String ChannelSubscriptionsDatasetName = "ChannelSubscriptionsDatasetName";
+ String BrokerSubscriptionsDatasetName = "BrokerSubscriptionsDatasetName";
String CHANNEL_EXTENSION_NAME = "Channel";
String PROCEDURE_KEYWORD = "Procedure";
String BROKER_KEYWORD = "Broker";
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 8403fcc..5d9d971 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -142,7 +142,11 @@
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
tempMdProvider.getLocks().reset();
dropStmt = new DropDatasetStatement(new Identifier(dataverse),
- new Identifier(channel.getSubscriptionsDataset()), true);
+ new Identifier(channel.getChannelSubscriptionsDataset()), true);
+ ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
+ tempMdProvider.getLocks().reset();
+ dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+ new Identifier(channel.getBrokerSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7583f0b..5803f60 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -138,7 +138,7 @@
throw new AsterixException("There is no broker with this name " + brokerName + ".");
}
- String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+ String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
if (argList.size() != channel.getFunction().getArity()) {
throw new AsterixException("Channel expected " + channel.getFunction().getArity()
@@ -157,7 +157,7 @@
fb.add(new FieldBinding(leftExpr, rightExpr));
if (subscriptionId != null) {
- leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+ leftExpr = new LiteralExpr(new StringLiteral(BADConstants.ChannelSubscriptionId));
List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
@@ -190,7 +190,8 @@
VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
useResultVar.setIsNewVar(false);
- FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));
+ FieldAccessor accessor =
+ new FieldAccessor(useResultVar, new Identifier(BADConstants.ChannelSubscriptionId));
metadataProvider.setResultSetId(new ResultSetId(resultSetId));
boolean resultsAsync =
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index b23bf3b..e6ba69c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -113,11 +113,11 @@
throw new AsterixException("There is no channel with this name " + channelName + ".");
}
- String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+ String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
//Need a condition to say subscription-id = sid
OperatorExpr condition = new OperatorExpr();
- FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+ FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.ChannelSubscriptionId));
condition.addOperand(fa);
condition.setCurrentop(true);
condition.addOperator("=");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a28666a..b2ffa1b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -85,7 +85,8 @@
private Identifier dataverseName;
private String duration;
private String body;
- private String subscriptionsTableName;
+ private String channelSubscriptionsTableName;
+ private String brokerSubscriptionsTableName;
private String resultsTableName;
private String dataverse;
private final boolean push;
@@ -113,7 +114,7 @@
}
public String getSubscriptionsName() {
- return subscriptionsTableName;
+ return channelSubscriptionsTableName;
}
public String getDuration() {
@@ -159,29 +160,53 @@
private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws AsterixException, Exception {
+ //Create channel subscriptions dataset
Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
List<List<String>> partitionFields = new ArrayList<>();
List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
- List<String> fieldNames = new ArrayList<>();
- fieldNames.add(BADConstants.SubscriptionId);
- partitionFields.add(fieldNames);
- IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
- DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
- new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
- new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+ List<String> fieldName = new ArrayList<>();
+ fieldName.add(BADConstants.ChannelSubscriptionId);
+ partitionFields.add(fieldName);
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+ DatasetDecl createChannelSubscriptionsDataset =
+ new DatasetDecl(dataverseName, new Identifier(channelSubscriptionsTableName),
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
+ new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
- ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+ createChannelSubscriptionsDataset, hcc, null);
+
+ //Create broker subscriptions dataset
+ Identifier brokerSubscriptionsTypeName = new Identifier(BADConstants.BrokerSubscriptionsType);
+ partitionFields = new ArrayList<>();
+ keyIndicators = new ArrayList<>();
+ keyIndicators.add(0);
+ keyIndicators.add(0);
+ fieldName = new ArrayList<>();
+ List<String> fieldName2 = new ArrayList<>();
+ fieldName.add(BADConstants.ChannelSubscriptionId);
+ fieldName2.add(BADConstants.BrokerSubscriptionId);
+ partitionFields.add(fieldName);
+ partitionFields.add(fieldName2);
+ idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+ DatasetDecl createBrokerSubscriptionsDataset =
+ new DatasetDecl(dataverseName, new Identifier(brokerSubscriptionsTableName),
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), brokerSubscriptionsTypeName, null, null, null,
+ new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+ metadataProvider.getLocks().reset();
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+ createBrokerSubscriptionsDataset,
hcc, null);
if (!push) {
//Setup the results dataset
partitionFields = new ArrayList<>();
- fieldNames = new ArrayList<>();
- fieldNames.add(BADConstants.ResultId);
- partitionFields.add(fieldNames);
+ fieldName = new ArrayList<>();
+ fieldName.add(BADConstants.ResultId);
+ partitionFields.add(fieldName);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
@@ -225,9 +250,10 @@
builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
- builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
+ builder.append("sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
- builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
+ builder.append("from " + dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+ builder.append(dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
builder.append(function.getNamespace() + "." + function.getName() + "(");
int i = 0;
@@ -235,8 +261,6 @@
builder.append("sub.param" + i + ",");
}
builder.append("sub.param" + i + ") result \n");
- builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
- builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
if (!push) {
builder.append(")");
builder.append(" returning a");
@@ -269,7 +293,9 @@
dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
dataverse = dataverseName.getValue();
- subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
+ channelSubscriptionsTableName =
+ channelName + BADConstants.CHANNEL_EXTENSION_NAME + BADConstants.subscriptionEnding;
+ brokerSubscriptionsTableName = channelName + BADConstants.BROKER_KEYWORD + BADConstants.subscriptionEnding;
resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
@@ -297,7 +323,10 @@
initialize(mdTxnCtx);
//check if names are available before creating anything
- if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, channelSubscriptionsTableName) != null) {
+ throw new AsterixException("The channel name:" + channelName + " is not available.");
+ }
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, brokerSubscriptionsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
@@ -323,7 +352,8 @@
BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
duration);
- channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+ channel = new Channel(dataverse, channelName.getValue(), channelSubscriptionsTableName,
+ brokerSubscriptionsTableName, resultsTableName, function,
duration, null, body);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd2ff86..5be2e84 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -47,8 +47,12 @@
public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
- public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_CHANNEL_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+
+ public static final Datatype BAD_BROKER_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.BrokerSubscriptionsType, BADMetadataRecordTypes.brokerSubscriptionsType, false);
+
public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
@@ -76,7 +80,6 @@
return new MetadataTupleTranslatorProvider();
}
- @SuppressWarnings("rawtypes")
@Override
public List<ExtensionMetadataDataset> getExtensionIndexes() {
try {
@@ -107,7 +110,8 @@
// MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
// insert default data type
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_SUBSCRIPTION_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_SUBSCRIPTION_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index a764a5a..f6e2e1f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -27,16 +27,24 @@
public class BADMetadataRecordTypes {
- // -------------------------------------- Subscriptions --------------------------------------//
- private static final String[] subTypeFieldNames =
- { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.SubscriptionId };
- private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+ // -------------------------------------- Channel Subscriptions --------------------------------------//
+ private static final String[] channelSubTypeFieldNames = { BADConstants.ChannelSubscriptionId };
+ private static final IAType[] channelSubTypeFieldTypes = { BuiltinType.AUUID };
public static final ARecordType channelSubscriptionsType =
- new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true);
+ new ARecordType(BADConstants.ChannelSubscriptionsType, channelSubTypeFieldNames, channelSubTypeFieldTypes,
+ true);
+
+ // -------------------------------------- Broker Subscriptions --------------------------------------//
+ private static final String[] brokerSubTypeFieldNames = { BADConstants.ChannelSubscriptionId,
+ BADConstants.BrokerSubscriptionId, BADConstants.DataverseName, BADConstants.BrokerName };
+ private static final IAType[] brokerSubTypeFieldTypes =
+ { BuiltinType.AUUID, BuiltinType.AUUID, BuiltinType.ASTRING, BuiltinType.ASTRING };
+ public static final ARecordType brokerSubscriptionsType = new ARecordType(BADConstants.BrokerSubscriptionsType,
+ brokerSubTypeFieldNames, brokerSubTypeFieldTypes, true);
// ---------------------------------------- Results --------------------------------------------//
private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
- BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+ BADConstants.ChannelSubscriptionId, BADConstants.DeliveryTime };
private static final IAType[] resultTypeFieldTypes =
{ BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, BuiltinType.ADATETIME };
public static final ARecordType channelResultsType =
@@ -45,25 +53,28 @@
//------------------------------------------ Channel ----------------------------------------//
public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
- public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+ public static final int CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
+ public static final int CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX = 8;
public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
// RecordTypeName
BADConstants.RECORD_TYPENAME_CHANNEL,
// FieldNames
- new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+ new String[] { BADConstants.DataverseName, BADConstants.ChannelName,
+ BADConstants.ChannelSubscriptionsDatasetName,
BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
- BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
+ BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY,
+ BADConstants.BrokerSubscriptionsDatasetName },
// FieldTypes
new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
null),
- BuiltinType.ASTRING },
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
//IsOpen?
true);
//------------------------------------------ Broker ----------------------------------------//
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index ed9346c..61df9af 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -34,7 +34,8 @@
/** A unique identifier for the channel */
protected final EntityId channelId;
- private final String subscriptionsDatasetName;
+ private final String channelSubscriptionsDatasetName;
+ private final String brokerSubscriptionsDatasetName;
private final String resultsDatasetName;
private final String duration;
private final String channelBody;
@@ -49,13 +50,15 @@
*/
private final List<List<List<String>>> dependencies;
- public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+ public Channel(String dataverseName, String channelName, String channelSubscriptionsDatasetName,
+ String brokerSubscriptionsDatasetName, String resultsDataset,
FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
this.function = function;
this.duration = duration;
this.resultsDatasetName = resultsDataset;
- this.subscriptionsDatasetName = subscriptionsDataset;
+ this.channelSubscriptionsDatasetName = channelSubscriptionsDatasetName;
+ this.brokerSubscriptionsDatasetName = brokerSubscriptionsDatasetName;
this.channelBody = channelBody;
if (this.function.getNamespace() == null) {
this.function.setNamespace(dataverseName);
@@ -67,7 +70,7 @@
this.dependencies.add(new ArrayList<>());
this.dependencies.add(new ArrayList<>());
List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName);
- List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName);
+ List<String> subscriptionList = Arrays.asList(dataverseName, channelSubscriptionsDatasetName);
this.dependencies.get(0).add(resultsList);
this.dependencies.get(0).add(subscriptionList);
this.dependencies.get(1).add(functionAsPath);
@@ -84,8 +87,12 @@
return dependencies;
}
- public String getSubscriptionsDataset() {
- return subscriptionsDatasetName;
+ public String getChannelSubscriptionsDataset() {
+ return channelSubscriptionsDatasetName;
+ }
+
+ public String getBrokerSubscriptionsDataset() {
+ return brokerSubscriptionsDatasetName;
}
public String getResultsDatasetName() {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 175280e..6cf7f50 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -82,8 +82,9 @@
String channelName =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX))
.getStringValue();
- String subscriptionsName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+ String channelSubscriptionsName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+ .getStringValue();
String resultsName =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX))
.getStringValue();
@@ -127,10 +128,15 @@
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
.getStringValue();
+ String brokerSubscriptionsName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+ .getStringValue();
+
FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
Integer.parseInt(functionSignature.get(2)));
- channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
+ channel = new Channel(dataverseName, channelName, channelSubscriptionsName, brokerSubscriptionsName,
+ resultsName, signature, duration,
dependencies, channelBody);
return channel;
}
@@ -164,9 +170,10 @@
// write field 2
fieldValue.reset();
- aString.setValue(channel.getSubscriptionsDataset());
+ aString.setValue(channel.getChannelSubscriptionsDataset());
stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+ fieldValue);
// write field 3
fieldValue.reset();
@@ -227,6 +234,13 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
+ // write field 8
+ fieldValue.reset();
+ aString.setValue(channel.getBrokerSubscriptionsDataset());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+ fieldValue);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..4ec6d2f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -28,9 +28,11 @@
import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.FunctionInfo;
import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -47,6 +49,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -58,6 +61,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -76,25 +80,19 @@
if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
- boolean push = false;
- AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
- if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
- return false;
- }
+ boolean push = false;
+ AbstractLogicalOperator op = findOp(op1, 4, "", "");
+ if (op == null) {
push = true;
}
+
DataSourceScanOperator subscriptionsScan;
String channelDataverse;
String channelName;
if (!push) {
- DelegateOperator eOp = (DelegateOperator) op;
- if (!(eOp.getDelegate() instanceof CommitOperator)) {
- return false;
- }
- AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
return false;
}
@@ -113,15 +111,20 @@
//Now we know that we are inserting into results
channelName = datasetName.substring(0, datasetName.length() - 7);
- String subscriptionsName = channelName + "Subscriptions";
- subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+ String subscriptionsName = channelName + "ChannelSubscriptions";
+ subscriptionsScan =
+ (DataSourceScanOperator) findOp(op, 3, subscriptionsName, BADConstants.ChannelSubscriptionsType);
if (subscriptionsScan == null) {
return false;
}
} else {
+ op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+ return false;
+ }
//if push, get the channel name here instead
- subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+ subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "", BADConstants.ChannelSubscriptionsType);
if (subscriptionsScan == null) {
return false;
}
@@ -131,15 +134,8 @@
channelName = datasetName.substring(0, datasetName.length() - 13);
}
- //Now we need to get the broker EndPoint
- LogicalVariable brokerEndpointVar = context.newVar();
- AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
- if (opAboveBrokersScan == null) {
- return false;
- }
-
- //get subscriptionIdVar
- LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+ //get channelSubscriptionIdVar
+ LogicalVariable channelSubscriptionIdVar = subscriptionsScan.getVariables().get(0);
//The channelExecutionTime is created just before the scan
ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -155,25 +151,48 @@
((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
}
- AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
- //now brokerNameVar holds the brokerName for use farther up in the plan
+ //move broker scan
+ AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "", "");
+ if (opAboveBrokersScan == null) {
+ return false;
+ }
+ DataSourceScanOperator brokerScan =
+ moveScans(opAboveBrokersScan, op1, context, channelName + "BrokerSubscriptions");
- context.computeAndSetTypeEnvironmentForOperator(assignOp);
- context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
- context.computeAndSetTypeEnvironmentForOperator(op);
+ if (brokerScan == null) {
+ return false;
+ }
+ DataSourceScanOperator brokerSubscriptionScan =
+ (DataSourceScanOperator) brokerScan.getInputs().get(0).getValue();
- ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
- badProject.getVariables().add(subscriptionIdVar);
- badProject.getVariables().add(brokerEndpointVar);
+ //Add select to join subscriptions and broker and assign to get endpoint
+ LogicalVariable brokerDataverseVar = brokerScan.getVariables().get(0);
+ LogicalVariable brokerNameVar = brokerScan.getVariables().get(1);
+ LogicalVariable brokerVar = brokerScan.getVariables().get(2);
+ LogicalVariable brokerSubscriptionVar = brokerSubscriptionScan.getVariables().get(2);
+ LogicalVariable brokerSubscriptionIdVar = brokerSubscriptionScan.getVariables().get(1);
+ LogicalVariable brokerSubscriptionChannelIdVar = brokerSubscriptionScan.getVariables().get(0);
+
+ LogicalVariable brokerEndpointVar = context.newVar();
+ AssignOperator assign = createAssignAndSelect(brokerDataverseVar, brokerNameVar, brokerVar,
+ brokerSubscriptionVar, brokerSubscriptionIdVar, brokerEndpointVar, channelSubscriptionIdVar,
+ brokerSubscriptionChannelIdVar, context, op1);
+
+ context.computeAndSetTypeEnvironmentForOperator(op1);
+
+ //Maybe we need to add a project???
+ ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
badProject.getVariables().add(channelExecutionVar);
+ badProject.getVariables().add(channelSubscriptionIdVar);
context.computeAndSetTypeEnvironmentForOperator(badProject);
//Create my brokerNotify plan above the extension Operator
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
- context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
- : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+ context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
+ : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+ assign,
(DistributeResultOperator) op1, channelDataverse, channelName);
opRef.setValue(dOp);
@@ -181,6 +200,98 @@
return true;
}
+ private AssignOperator createAssignAndSelect(LogicalVariable brokerDataverseVar, LogicalVariable brokerNameVar,
+ LogicalVariable brokerVar, LogicalVariable brokerSubscriptionVar, LogicalVariable brokerSubscriptionIdVar,
+ LogicalVariable brokerEndpointVar, LogicalVariable channelSubscriptionIdVar,
+ LogicalVariable brokerSubscriptionChannelIdVar, IOptimizationContext context, AbstractLogicalOperator op1)
+ throws AlgebricksException {
+
+ FunctionInfo finfoGetField =
+ (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_INDEX);
+ FunctionInfo finfoGetEquality = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.EQ);
+ FunctionInfo finfoGetAnd = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND);
+
+ //Create Select Operator
+ //The operator matches (A) the broker dataverse and name between broker subscriptions and brokers
+ // and (B) the channel subscription Id between broker subscriptions and channel subscriptions
+ ScalarFunctionCallExpression getBrokerName = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+ new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(3)))));
+ ScalarFunctionCallExpression getBrokerDataverse = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
+ new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+
+ VariableReferenceExpression BrokerBrokerNameReference = new VariableReferenceExpression(brokerNameVar);
+ VariableReferenceExpression BrokerBrokerDataverseReference =
+ new VariableReferenceExpression(brokerDataverseVar);
+ VariableReferenceExpression brokerSubscriptionChannelIdVarReference =
+ new VariableReferenceExpression(brokerSubscriptionChannelIdVar);
+
+ ScalarFunctionCallExpression brokerNameCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+ new MutableObject<>(getBrokerName), new MutableObject<>(BrokerBrokerNameReference));
+ ScalarFunctionCallExpression brokerDataverseCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+ new MutableObject<>(getBrokerDataverse), new MutableObject<>(BrokerBrokerDataverseReference));
+ ScalarFunctionCallExpression channelSubCheck = new ScalarFunctionCallExpression(finfoGetEquality,
+ new MutableObject<>(brokerSubscriptionChannelIdVarReference),
+ new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
+
+ ScalarFunctionCallExpression andExpression =
+ new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+ new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
+
+ SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
+ select.getInputs().addAll(op1.getInputs());
+
+ //Create Assign Operator
+ ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(brokerVar)),
+ new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
+ AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
+ assign.getInputs().add(new MutableObject<>(select));
+
+ op1.getInputs().set(0, new MutableObject<>(assign));
+ context.computeAndSetTypeEnvironmentForOperator(select);
+ context.computeAndSetTypeEnvironmentForOperator(assign);
+ context.computeAndSetTypeEnvironmentForOperator(op1);
+
+ return assign;
+
+ }
+
+ private DataSourceScanOperator moveScans(AbstractLogicalOperator opAboveScan, AbstractLogicalOperator op1,
+ IOptimizationContext context, String subscriptionsName) throws AlgebricksException {
+
+ DataSourceScanOperator brokerScan = null;
+ int i = 0;
+ for (Mutable<ILogicalOperator> subOp : opAboveScan.getInputs()) {
+ if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+ brokerScan = (DataSourceScanOperator) subOp.getValue();
+ break;
+ }
+ i++;
+ }
+ if (brokerScan == null) {
+ return null;
+ }
+
+ AbstractLogicalOperator brokerSubcriptionsScan =
+ (AbstractLogicalOperator) brokerScan.getInputs().get(0).getValue();
+
+ if (!isSubscriptionsScan(brokerSubcriptionsScan, subscriptionsName, BADConstants.BrokerSubscriptionsType)) {
+ return null;
+ }
+ opAboveScan.getInputs().set(i, brokerSubcriptionsScan.getInputs().get(0));
+ context.computeAndSetTypeEnvironmentForOperator(opAboveScan);
+
+ brokerSubcriptionsScan.getInputs().set(0, op1.getInputs().get(0));
+ op1.getInputs().set(0, new MutableObject<>(brokerScan));
+ context.computeAndSetTypeEnvironmentForOperator(brokerSubcriptionsScan);
+ context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+ context.computeAndSetTypeEnvironmentForOperator(op1);
+ return brokerScan;
+
+ }
+
private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
IAType resultType) {
@@ -310,49 +421,66 @@
}
/*This function is used to find specific operators within the plan, either
- * A. The brokers dataset scan
- * B. The subscriptions scan
- * C. The highest project of the plan
+ * 1. The brokers dataset scan
+ * 2. The highest project of the plan
+ * 3. The subscriptions scan
+ * 4. Commit operator
+ *
+ * param1 is the name of the expected subscriptions dataset when searching for a subscriptions scan
+ * and param2 is the type of the subscriptions dataset (channel or broker)
*/
- private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+ private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
if (!op.hasInputs()) {
return null;
}
for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
- if (lookingForString.equals("brokers")) {
+ if (searchId == 1) {
if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
return op;
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
+ searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
}
- } else if (lookingForString.equals("project")) {
+ } else if (searchId == 2) {
if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
+ searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
}
}
- else {
- if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+ else if (searchId == 3) {
+ if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param1, param2)) {
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
+ searchId, param1, param2);
if (nestedOp != null) {
return nestedOp;
}
}
+ } else if (searchId == 4) {
+ if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+ DelegateOperator dOp = (DelegateOperator) subOp.getValue();
+ if (dOp.getDelegate() instanceof CommitOperator) {
+ return (AbstractLogicalOperator) subOp.getValue();
+ }
+ } else {
+ AbstractLogicalOperator nestedOp =
+ findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param1, param2);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
}
}
return null;
@@ -371,12 +499,12 @@
return false;
}
- private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+ private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName, String subscriptionType) {
if (op instanceof DataSourceScanOperator) {
if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
- && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+ && dds.getDataset().getItemTypeName().equals(subscriptionType)) {
if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
return true;
}