blob: 85014ab373a248725b890528c90428e1332bf73f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.utils;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.Namespace;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedActivityDetails;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.literal.IntegerLiteral;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.UpsertStatement;
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.lang.sqlpp.clause.FromClause;
import org.apache.asterix.lang.sqlpp.clause.FromTerm;
import org.apache.asterix.lang.sqlpp.clause.SelectBlock;
import org.apache.asterix.lang.sqlpp.clause.SelectClause;
import org.apache.asterix.lang.sqlpp.clause.SelectElement;
import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.feeds.LocationConstraint;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
/**
* Provides helper method(s) for creating JobSpec for operations on a feed.
*/
public class FeedOperations {
public static final String FEED_DATAFLOW_INTERMEIDATE_VAL_PREFIX = "val";
private FeedOperations() {
}
private static Pair<JobSpecification, ITypedAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
spec.setFrameSize(metadataProvider.getApplicationContext().getCompilerProperties().getFrameSize());
ITypedAdapterFactory adapterFactory;
IOperatorDescriptor feedIngestor;
AlgebricksPartitionConstraint ingesterPc;
Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> t =
metadataProvider.getFeedIntakeRuntime(spec, feed, policyAccessor);
feedIngestor = t.first;
ingesterPc = t.second;
adapterFactory = t.third;
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
spec.addRoot(nullSink);
return Pair.of(spec, adapterFactory);
}
public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider metadataProvider, Feed feed)
throws AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(feed);
FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec,
partitioningProperties.getSplitsProvider(), true, partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
partitioningProperties.getConstraints());
spec.addRoot(frod);
return spec;
}
private static List<Expression> addArgs(Object... args) {
List<Expression> argExprs = new ArrayList<>();
for (Object arg : args) {
if (arg instanceof Integer) {
argExprs.add(new LiteralExpr(new IntegerLiteral((Integer) arg)));
} else if (arg instanceof String) {
argExprs.add(new LiteralExpr(new StringLiteral((String) arg)));
} else if (arg instanceof Expression) {
argExprs.add((Expression) arg);
} else if (arg instanceof DataverseName) {
argExprs.add(new LiteralExpr(new StringLiteral(((DataverseName) arg).getCanonicalForm())));
} else {
throw new IllegalArgumentException();
}
}
return argExprs;
}
private static Query makeConnectionQuery(FeedConnection feedConnection, MetadataProvider md)
throws AlgebricksException {
// Construct from clause
VarIdentifier fromVarId = SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
// TODO: remove target feedid from args list (xikui)
// TODO: Get rid of this INTAKE
List<Expression> exprList = addArgs(feedConnection.getDatabaseName(), feedConnection.getDataverseName(),
feedConnection.getFeedId().getEntityName(), feedConnection.getFeedId().getEntityName(),
FeedRuntimeType.INTAKE.toString(), feedConnection.getDatasetName(), feedConnection.getOutputType());
CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null);
FromClause fromClause = new FromClause(Arrays.asList(fromterm));
WhereClause whereClause = null;
if (feedConnection.getWhereClauseBody().length() != 0) {
String whereClauseExpr = feedConnection.getWhereClauseBody() + ";";
IParserFactory sqlppParserFactory = new SqlppParserFactory(md.getNamespaceResolver());
IParser sqlppParser = sqlppParserFactory.createParser(whereClauseExpr);
List<Statement> stmts = sqlppParser.parse();
if (stmts.size() != 1) {
throw new CompilationException("Exceptions happened in processing where clause.");
}
Query whereClauseQuery = (Query) stmts.get(0);
whereClause = new WhereClause(whereClauseQuery.getBody());
}
// Attaching functions
Expression previousVarExpr = fromTermLeftExpr;
for (FunctionSignature functionSignature : feedConnection.getAppliedFunctions()) {
CallExpr functionCallExpr = new CallExpr(functionSignature, addArgs(previousVarExpr));
previousVarExpr = functionCallExpr;
}
// Constructing select clause
SelectElement selectElement = new SelectElement(previousVarExpr);
SelectClause selectClause = new SelectClause(selectElement, null, false);
SelectBlock selectBlock = new SelectBlock(selectClause, fromClause,
whereClause != null ? Collections.singletonList(whereClause) : null, null, null);
SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
SelectExpression body = new SelectExpression(null, selectSetOperation, null, null, true);
Query query = new Query(false, true, body, 0);
return query;
}
private static JobSpecification getConnectionJob(MetadataProvider metadataProvider, FeedConnection feedConn,
IStatementExecutor statementExecutor, IHyracksClientConnection hcc, Boolean insertFeed)
throws AlgebricksException, RemoteException, ACIDException {
metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, feedConn.getPolicyName());
Query feedConnQuery = makeConnectionQuery(feedConn, metadataProvider);
CompiledStatements.ICompiledDmlStatement clfrqs;
String feedDatabaseName = feedConn.getDatabaseName();
DataverseName feedDataverseName = feedConn.getDataverseName();
if (insertFeed) {
InsertStatement stmtUpsert = new InsertStatement(new Namespace(feedDatabaseName, feedDataverseName),
feedConn.getDatasetName(), feedConnQuery, -1, null, null);
clfrqs = new CompiledStatements.CompiledInsertStatement(feedDatabaseName, feedDataverseName,
feedConn.getDatasetName(), feedConnQuery, stmtUpsert.getVarCounter(), null, null);
} else {
UpsertStatement stmtUpsert = new UpsertStatement(new Namespace(feedDatabaseName, feedDataverseName),
feedConn.getDatasetName(), feedConnQuery, -1, null, null);
clfrqs = new CompiledStatements.CompiledUpsertStatement(feedDatabaseName, feedDataverseName,
feedConn.getDatasetName(), feedConnQuery, stmtUpsert.getVarCounter(), null, null);
}
return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null);
}
private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,
JobSpecification intakeJob, List<JobSpecification> jobsList, List<FeedConnection> feedConnections,
String[] intakeLocations) throws AlgebricksException, HyracksDataException {
JobSpecification jobSpec = new JobSpecification(intakeJob.getFrameSize());
// copy ingestor
FeedIntakeOperatorDescriptor firstOp =
(FeedIntakeOperatorDescriptor) intakeJob.getOperatorMap().get(new OperatorDescriptorId(0));
FeedIntakeOperatorDescriptor ingestionOp;
if (firstOp.getAdaptorFactory() == null) {
ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryDataverse(),
firstOp.getAdaptorLibraryName(), firstOp.getAdaptorFactoryClassName(),
firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
firstOp.getOutputRecordDescriptors()[0]);
} else {
ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorFactory(),
firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
firstOp.getOutputRecordDescriptors()[0]);
}
// create replicator
ReplicateOperatorDescriptor replicateOp =
new ReplicateOperatorDescriptor(jobSpec, ingestionOp.getOutputRecordDescriptors()[0], jobsList.size());
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), ingestionOp, 0, replicateOp, 0);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, ingestionOp, intakeLocations);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, replicateOp, intakeLocations);
// Loop over the jobs to copy operators and connections
Map<OperatorDescriptorId, OperatorDescriptorId> operatorIdMapping = new HashMap<>();
Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
Map<Integer, TxnId> txnIdMap = new HashMap<>();
FeedMetaOperatorDescriptor metaOp;
for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
FeedConnection curFeedConnection = feedConnections.get(iter1);
JobSpecification subJob = jobsList.get(iter1);
operatorIdMapping.clear();
Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap();
String datasetName = feedConnections.get(iter1).getDatasetName();
FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(), datasetName);
FeedPolicyEntity feedPolicyEntity = FeedMetadataUtil.validateIfPolicyExists(
curFeedConnection.getDatabaseName(), curFeedConnection.getDataverseName(),
curFeedConnection.getPolicyName(), metadataProvider.getMetadataTxnContext());
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) {
IOperatorDescriptor opDesc = entry.getValue();
OperatorDescriptorId oldId = opDesc.getOperatorId();
OperatorDescriptorId opId = null;
if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
&& ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, true);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
} else {
if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor) opDesc;
IPushRuntimeFactory[] runtimeFactories = algOp.getPipeline().getRuntimeFactories();
// Tweak AssignOp to work with messages
if (runtimeFactories[0] instanceof AssignRuntimeFactory && runtimeFactories.length > 1) {
IConnectorDescriptor connectorDesc =
subJob.getOperatorInputMap().get(opDesc.getOperatorId()).get(0);
// anything on the network interface needs to be message compatible
if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, true);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
}
}
}
if (opId == null) {
opId = jobSpec.createOperatorDescriptorId(opDesc);
}
}
operatorIdMapping.put(oldId, opId);
}
// copy connectors
connectorIdMapping.clear();
subJob.getConnectorMap().forEach((key, connDesc) -> {
ConnectorDescriptorId newConnId;
if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
m2nConn.getTuplePartitionComputerFactory());
newConnId = connDesc.getConnectorId();
} else {
newConnId = jobSpec.createConnectorDescriptor(connDesc);
}
connectorIdMapping.put(key, newConnId);
});
// make connections between operators
for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
.getConnectorOperatorMap().entrySet()) {
ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
IOperatorDescriptor leftOpDesc = jobSpec.getOperatorMap().get(leftOp.getLeft().getOperatorId());
IOperatorDescriptor rightOpDesc = jobSpec.getOperatorMap().get(rightOp.getLeft().getOperatorId());
if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) {
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp, iter1, leftOpDesc,
leftOp.getRight());
}
jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
}
// prepare for setting partition constraints
operatorLocations.clear();
operatorCounts.clear();
for (Constraint constraint : subJob.getUserConstraints()) {
LValueConstraintExpression lexpr = constraint.getLValue();
ConstraintExpression cexpr = constraint.getRValue();
OperatorDescriptorId opId;
switch (lexpr.getTag()) {
case PARTITION_COUNT:
opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression) cexpr).getValue());
break;
case PARTITION_LOCATION:
opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
if (locations == null) {
locations = new ArrayList<>();
operatorLocations.put(opDesc.getOperatorId(), locations);
}
String location = (String) ((ConstantExpression) cexpr).getValue();
LocationConstraint lc =
new LocationConstraint(location, ((PartitionLocationExpression) lexpr).getPartition());
locations.add(lc);
break;
default:
break;
}
}
// set absolute location constraints
for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(entry.getKey());
// why do we need to sort?
Collections.sort(entry.getValue(), (LocationConstraint o1, LocationConstraint o2) -> {
return o1.partition - o2.partition;
});
String[] locations = new String[entry.getValue().size()];
for (int j = 0; j < locations.length; ++j) {
locations[j] = entry.getValue().get(j).location;
}
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, opDesc, locations);
}
// set count constraints
operatorCounts.forEach((key, value) -> {
IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(key);
if (!operatorLocations.keySet().contains(key)) {
PartitionConstraintHelper.addPartitionCountConstraint(jobSpec, opDesc, value);
}
});
// roots
for (OperatorDescriptorId root : subJob.getRoots()) {
jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
}
int datasetId = metadataProvider.findDataset(curFeedConnection.getDatabaseName(),
curFeedConnection.getDataverseName(), curFeedConnection.getDatasetName()).getDatasetId();
TxnId txnId = ((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId(datasetId);
txnIdMap.put(datasetId, txnId);
}
// jobEventListenerFactory
jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIdMap, true));
// useConnectorSchedulingPolicy
jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
// connectorAssignmentPolicy
jobSpec.setConnectorPolicyAssignmentPolicy(jobsList.get(0).getConnectorPolicyAssignmentPolicy());
return jobSpec;
}
public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections,
IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception {
FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
List<JobSpecification> jobsList = new ArrayList<>();
// TODO: Figure out a better way to handle insert/upsert per conn instead of per feed
Boolean insertFeed = ExternalDataUtils.isInsertFeed(feed.getConfiguration());
// Construct the ingestion Job
JobSpecification intakeJob = intakeInfo.getLeft();
ITypedAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
// Add metadata configs
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, Boolean.TRUE.toString());
metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
StringUtils.join(ingestionLocations, ','));
// Add connection job
for (FeedConnection feedConnection : feedConnections) {
JobSpecification connectionJob =
getConnectionJob(metadataProvider, feedConnection, statementExecutor, hcc, insertFeed);
jobsList.add(connectionJob);
}
return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections,
ingestionLocations), intakeInfo.getRight().getPartitionConstraint());
}
}