blob: 6e0ed834f57bf572a6326f2aa43d4228726623cd [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata.feeds;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
/**
* A utility class for providing helper functions for feeds
*/
public class FeedUtil {
private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
public static boolean isFeedActive(FeedActivity feedActivity) {
return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_FAILURE) || feedActivity
.getActivityType().equals(FeedActivityType.FEED_END)));
}
private static class LocationConstraint {
int partition;
String location;
}
public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
FeedPolicyAccessor fpa = new FeedPolicyAccessor(feedPolicy.getProperties());
boolean alterationRequired = (fpa.collectStatistics() || fpa.continueOnApplicationFailure()
|| fpa.continueOnHardwareFailure() || fpa.isElastic());
if (!alterationRequired) {
return spec;
}
JobSpecification altered = new JobSpecification();
Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
// copy operators
Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
IOperatorDescriptor opDesc = entry.getValue();
if (opDesc instanceof FeedIntakeOperatorDescriptor) {
FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
FeedIntakeOperatorDescriptor fiop;
if (orig.getAdapterFactory() != null) {
fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
(ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
} else {
fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
(ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
}
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
feedPolicy, FeedRuntimeType.STORAGE);
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
} else {
FeedRuntimeType runtimeType = null;
if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
.getRuntimeFactories()[0];
if (runtimeFactory instanceof AssignRuntimeFactory) {
runtimeType = FeedRuntimeType.COMPUTE;
} else if (runtimeFactory instanceof StreamProjectRuntimeFactory) {
runtimeType = FeedRuntimeType.COMMIT;
}
}
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
feedPolicy, runtimeType);
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
}
}
// copy connectors
Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
IConnectorDescriptor connDesc = entry.getValue();
ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
connectorMapping.put(entry.getKey(), newConnId);
}
// make connections between operators
for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
.getConnectorOperatorMap().entrySet()) {
IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
IOperatorDescriptor leftOpDesc = altered.getOperatorMap().get(
oldNewOID.get(leftOp.getLeft().getOperatorId()));
IOperatorDescriptor rightOpDesc = altered.getOperatorMap().get(
oldNewOID.get(rightOp.getLeft().getOperatorId()));
altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
}
// prepare for setting partition constraints
Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<OperatorDescriptorId, List<LocationConstraint>>();
Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
for (Constraint constraint : spec.getUserConstraints()) {
LValueConstraintExpression lexpr = constraint.getLValue();
ConstraintExpression cexpr = constraint.getRValue();
OperatorDescriptorId opId;
switch (lexpr.getTag()) {
case PARTITION_COUNT:
opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue());
break;
case PARTITION_LOCATION:
opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
if (locations == null) {
locations = new ArrayList<>();
operatorLocations.put(opDesc.getOperatorId(), locations);
}
String location = (String) ((ConstantExpression) cexpr).getValue();
LocationConstraint lc = new LocationConstraint();
lc.location = location;
lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
locations.add(lc);
break;
}
}
// set absolute location constraints
for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
Collections.sort(entry.getValue(), new Comparator<LocationConstraint>() {
@Override
public int compare(LocationConstraint o1, LocationConstraint o2) {
return o1.partition - o2.partition;
}
});
String[] locations = new String[entry.getValue().size()];
for (int i = 0; i < locations.length; ++i) {
locations[i] = entry.getValue().get(i).location;
}
PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc, locations);
}
// set count constraints
for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
if (!operatorLocations.keySet().contains(entry.getKey())) {
PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
}
}
// useConnectorSchedulingPolicy
altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
// connectorAssignmentPolicy
altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
// roots
for (OperatorDescriptorId root : spec.getRoots()) {
altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
}
// jobEventListenerFactory
altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("New Job Spec:" + altered);
}
return altered;
}
public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
String adapterName = null;
DatasourceAdapter adapterEntity = null;
String adapterFactoryClassname = null;
IAdapterFactory adapterFactory = null;
ARecordType adapterOutputType = null;
Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
try {
adapterName = feed.getAdaptorName();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
adapterName);
if (adapterEntity == null) {
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
}
if (adapterEntity != null) {
adapterFactoryClassname = adapterEntity.getClassname();
switch (adapterEntity.getType()) {
case INTERNAL:
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
break;
case EXTERNAL:
String[] anameComponents = adapterName.split("#");
String libraryName = anameComponents[0];
ClassLoader cl = ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(),
libraryName);
adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
break;
}
} else {
adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
if (adapterFactoryClassname == null) {
adapterFactoryClassname = adapterName;
}
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
Map<String, String> configuration = feed.getAdaptorConfiguration();
switch (adapterFactory.getAdapterType()) {
case TYPED:
((ITypedAdapterFactory) adapterFactory).configure(configuration);
adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
break;
case GENERIC:
String outputTypeName = configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
if (outputTypeName == null) {
throw new IllegalArgumentException(
"You must specify the datatype associated with the incoming data. Datatype is specified by the "
+ IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
}
adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
feed.getDataverseName(), outputTypeName).getDatatype();
((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
break;
default:
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
adapterEntity.getType());
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to create adapter " + e);
}
return feedProps;
}
}