Merge branch 'master' into results
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index b7d775a..cc8204e 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -276,6 +276,16 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-data</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
<version>${hyracks.version}</version>
</dependency>
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
index 69145d9..c48ec54 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -108,9 +108,8 @@
public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
DeployedJobSpecEventListener listener) throws Exception {
- long executionMilliseconds =
- runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener,
- null);
+ long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory,
+ null, listener, null);
if (executionMilliseconds > period) {
LOGGER.log(Level.SEVERE,
"Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
@@ -153,7 +152,6 @@
}
-
public static long findPeriod(String duration) {
//TODO: Allow Repetitive Channels to use YMD durations
String hoursMinutesSeconds = "";
@@ -189,7 +187,8 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
try {
- jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+ jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null, null,
+ null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
@@ -230,7 +229,7 @@
jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
} else {
jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
- null, null, null, null, true, null);
+ null, null, null, null, true, null, null, null);
}
} else {
//Procedures
@@ -263,7 +262,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec;
try {
- jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
+ jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null, null, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
@@ -277,14 +276,15 @@
IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
- procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+ procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null,
+ null, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
procedureStatement.accept(visitor, null);
return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
- hcc, true);
+ hcc, true, null, null);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
index 2a3d4fb..99f0d66 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java
@@ -18,17 +18,11 @@
*/
package org.apache.asterix.bad.lang;
-import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.compiler.provider.IRuleSetFactory;
-import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
+import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.IRewriterFactory;
-import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
-import org.apache.asterix.lang.sqlpp.visitor.SqlppAstPrintVisitorFactory;
-import org.apache.asterix.translator.SqlppExpressionToPlanTranslatorFactory;
-public class BADCompilationProvider implements ILangCompilationProvider {
+public class BADCompilationProvider extends SqlppCompilationProvider {
@Override
public IParserFactory getParserFactory() {
@@ -36,21 +30,6 @@
}
@Override
- public IRewriterFactory getRewriterFactory() {
- return new SqlppRewriterFactory();
- }
-
- @Override
- public IAstPrintVisitorFactory getAstPrintVisitorFactory() {
- return new SqlppAstPrintVisitorFactory();
- }
-
- @Override
- public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() {
- return new SqlppExpressionToPlanTranslatorFactory();
- }
-
- @Override
public IRuleSetFactory getRuleSetFactory() {
return new BADRuleSetFactory();
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 2f23a9c..3ea2c0d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -334,7 +334,7 @@
}
}
}
- final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null);
+ final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null, null);
for (Channel channel : channels) {
if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) {
continue;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index cb2498c..5803f60 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -208,13 +208,13 @@
InsertStatement insert = new InsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
- resultDelivery, null, stats, false, null);
+ resultDelivery, null, stats, false, null, null, null);
} else {
//To update an existing subscription
UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
- resultDelivery, null, stats, false, null);
+ resultDelivery, null, stats, false, null, null, null);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 03a3f1d..e6ba69c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -139,7 +139,8 @@
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
- ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
+ ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null,
+ null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 1551a7c..b2ffa1b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -277,7 +277,7 @@
(Query) fStatements.get(1));
}
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
- hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
+ hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null);
}
@Override
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index be5bedb..aaf2bfa 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -176,7 +176,7 @@
private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats)
- throws Exception {
+ throws Exception {
if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
if (!varList.isEmpty()) {
throw new CompilationException("Insert procedures cannot have parameters");
@@ -185,9 +185,8 @@
dependencies.get(0).add(Arrays.asList(
((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()),
insertStatement.getDatasetName().getValue()));
- return new Pair<>(
- ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
- getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null),
+ return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null, null, null),
PrecompiledType.INSERT);
} else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
SqlppRewriterFactory fact = new SqlppRewriterFactory();
@@ -209,7 +208,7 @@
metadataProvider);
Pair<JobSpecification, PrecompiledType> pair =
new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
- getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
+ getProcedureBodyStatement(), hcc, true, null, null), PrecompiledType.DELETE);
return pair;
} else {
throw new CompilationException("Procedure can only execute a single delete, insert, or query");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
index d34d170..89d940e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java
@@ -122,7 +122,7 @@
listener.suspend();
activeEventHandler.registerListener(listener);
BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor,
- hcc, new RequestParameters(null, null, null, null, null, null), true);
+ hcc, new RequestParameters(null, null, null, null, null, null, null), true);
ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(),
hcc,
@@ -149,7 +149,7 @@
new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(),
ResultReader.NUM_READERS),
new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
- new IStatementExecutor.Stats(), null, null, null),
+ new IStatementExecutor.Stats(), null, null, null, null),
true);
metadataProvider.getLocks().unlock();
//Log that the procedure stopped by cluster restart. Procedure is available again now.
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 6ffb244..8e07af2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -19,34 +19,33 @@
package org.apache.asterix.bad.runtime;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AUUID;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,15 +57,19 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
- private final AOrderedListSerializerDeserializer subSerDes =
- new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
- private final ARecordSerializerDeserializer recordSerDes;
+ private static final AStringSerializerDeserializer stringSerDes =
+ new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+ private final IPrinter recordPrinterFactory;
+ private final IPrinter subscriptionIdListPrinterFactory;
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
@@ -74,14 +77,14 @@
private IScalarEvaluator eval0;
private IScalarEvaluator eval1;
private IScalarEvaluator eval2;
- private final ActiveManager activeManager;
private final EntityId entityId;
private final boolean push;
- private AOrderedList pushList;
- private ARecord pushRecord;
- private final IAType recordType;
- private final Map<String, HashSet<String>> sendData = new HashMap<>();
+ private final Map<String, String> sendData = new HashMap<>();
+ private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>();
+ private final Map<String, PrintStream> sendStreams = new HashMap<>();
private String executionTimeString;
+ private boolean firstResult = true;
+ String endpoint;
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
- this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
- .getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
this.push = push;
- this.pushList = null;
- this.pushRecord = null;
- this.recordType = recordType;
- recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
+ recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter();
+ subscriptionIdListPrinterFactory =
+ new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
executionTimeString = null;
}
@@ -106,28 +106,18 @@
return;
}
- private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
- for (int i = 0; i < subscriptionIds.size(); i++) {
- AUUID subId = (AUUID) subscriptionIds.getItem(i);
- String subscriptionString = subId.toString();
- //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
- subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
- subscriptionString = "\"" + subscriptionString + "\"";
- sendData.get(endpoint).add(subscriptionString);
- }
- }
-
public String createData(String endpoint) {
- String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
- + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
- + executionTimeString + "\", \"subscriptionIds\":[";
- for (String value : sendData.get(endpoint)) {
- JSON += value;
- JSON += ",";
+ String resultTitle = "\"subscriptionIds";
+ if (push) {
+ resultTitle = "\"results\"";
}
- JSON = JSON.substring(0, JSON.length() - 1);
- JSON += "]}";
- return JSON;
+ String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+ + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ + executionTimeString + "\", " + resultTitle + ":[";
+ jsonStr += sendData.get(endpoint);
+ jsonStr = jsonStr.substring(0, jsonStr.length());
+ jsonStr += "]}";
+ return jsonStr;
}
@@ -172,6 +162,11 @@
eval1.evaluate(tRef, inputArg1);
eval2.evaluate(tRef, inputArg2);
+ /*The incoming tuples have three fields:
+ 1. eval0 will get the serialized broker endpoint string
+ 2. eval1 will get the payload (either the subscriptionIds or entire results)
+ 3. eval2 will get the channel execution time stamp (the same for all tuples)
+ */
if (executionTimeString == null) {
int resultSetOffset = inputArg2.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
@@ -185,34 +180,51 @@
int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
- String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
- sendData.putIfAbsent(endpoint, new HashSet<>());
+ endpoint = stringSerDes.deserialize(di).getStringValue();
+ sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
+ try {
+ sendStreams.putIfAbsent(endpoint,
+ new PrintStream(sendbaos.get(endpoint), true, StandardCharsets.UTF_8.name()));
+ } catch (UnsupportedEncodingException e) {
+ throw new HyracksDataException(e.getMessage());
+ }
if (push) {
int pushOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
- //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
- pushRecord = recordSerDes.deserialize(di);
- sendData.get(endpoint).add(pushRecord.toString());
+ if (!firstResult) {
+ sendStreams.get(endpoint).append(',');
+ }
+ recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
+ sendStreams.get(endpoint));
} else {
- int serSubOffset = inputArg1.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
- pushList = subSerDes.deserialize(di);
- addSubscriptions(endpoint, pushList);
+ if (!firstResult) {
+ sendStreams.get(endpoint).append(',');
+ }
+ subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
+ inputArg1.getLength(),
+ sendStreams.get(endpoint));
}
+ firstResult = false;
}
}
@Override
public void close() throws HyracksDataException {
- for (String endpoint : sendData.keySet()) {
- if (sendData.get(endpoint).size() > 0) {
- sendGroupOfResults(endpoint);
- sendData.get(endpoint).clear();
+ for (String endpoint : sendStreams.keySet()) {
+ sendData.put(endpoint, new String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
+ sendGroupOfResults(endpoint);
+ sendStreams.get(endpoint).close();
+ try {
+ sendbaos.get(endpoint).close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
}
+
}
+
return;
}
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
index d0b3087..308c8b1 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
@@ -1,17 +1,17 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$134, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$141, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$134(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$134, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$141(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$141, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$109(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$109] |PARTITIONED|
+ -- STABLE_SORT [$$116(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$116] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -21,7 +21,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$104] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$111] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -33,12 +33,12 @@
-- STREAM_SELECT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$119][$$118] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$119] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$126][$$125] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$126] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$117, $$115][$$110, $$111] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$117, $$115] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$124, $$122][$$117, $$118] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$124, $$122] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -46,24 +46,24 @@
-- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$110, $$111] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$117, $$118] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$118] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$125] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$112][$$128] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$119][$$135] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$138(ASC)] |PARTITIONED|
+ -- STABLE_SORT [$$145(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -71,7 +71,7 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$128] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$135] |PARTITIONED|
-- NESTED_LOOP |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- UNION_ALL |PARTITIONED|
@@ -127,4 +127,4 @@
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
index a4370db..59805cc 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
@@ -1,17 +1,17 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$134, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$141, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$134(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$134, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$141(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$141, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$109(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$109] |PARTITIONED|
+ -- STABLE_SORT [$$116(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$116] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -21,7 +21,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$104] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$111] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -33,12 +33,12 @@
-- STREAM_SELECT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$119][$$118] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$119] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$126][$$125] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$126] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$117, $$115][$$110, $$111] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$117, $$115] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$124, $$122][$$117, $$118] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$124, $$122] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -46,24 +46,24 @@
-- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$110, $$111] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$117, $$118] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$118] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$125] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$112][$$128] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$119][$$135] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$138(ASC)] |PARTITIONED|
+ -- STABLE_SORT [$$145(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -71,7 +71,7 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$128] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$135] |PARTITIONED|
-- NESTED_LOOP |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -79,7 +79,7 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$142(ASC)] |PARTITIONED|
+ -- STABLE_SORT [$$149(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -94,11 +94,11 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$145(ASC)] |PARTITIONED|
+ -- STABLE_SORT [$$152(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
index 3e201a7..a080c79 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -1,17 +1,17 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$87, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$87(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$87, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$72(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$72] |PARTITIONED|
+ -- STABLE_SORT [$$79(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -21,7 +21,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$66] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$73] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -35,8 +35,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$79, $$77][$$73, $$74] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$79, $$77] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86, $$84] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -44,7 +44,7 @@
-- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$73, $$74] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$80, $$81] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -58,4 +58,4 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
index 653e42c..19dac15 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -1,17 +1,17 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$87, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$87(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$87, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$72(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$72] |PARTITIONED|
+ -- STABLE_SORT [$$79(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -21,7 +21,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$66] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$73] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -35,8 +35,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$79, $$77][$$73, $$74] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$79, $$77] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86, $$84] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -44,7 +44,7 @@
-- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$73, $$74] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$80, $$81] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -65,11 +65,11 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$4] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$12] |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- STREAM_PROJECT |UNPARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- STREAM_PROJECT |UNPARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- ASSIGN |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
index b15c0d6..a847ef0 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -1,17 +1,17 @@
-- NOTIFY_BROKERS |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$87, $$channelExecutionTime] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$87(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$87, $$channelExecutionTime] |PARTITIONED|
+ -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime] |PARTITIONED|
-- PRE_SORTED_DISTINCT_BY |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$72(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$72] |PARTITIONED|
+ -- STABLE_SORT [$$79(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$79] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -21,7 +21,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$66] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$73] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -35,8 +35,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$79, $$77][$$73, $$74] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$79, $$77] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86, $$84] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -44,7 +44,7 @@
-- BROADCAST_EXCHANGE |PARTITIONED|
-- ASSIGN |UNPARTITIONED|
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$73, $$74] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$80, $$81] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -72,4 +72,4 @@
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|