blob: 628c745666183c760b472d21f462b8e7e663a142 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.metadata.declared;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public class FeedDataSource extends DataSource implements IMutationDataSource {
private final Feed feed;
private final EntityId sourceFeedId;
private final FeedRuntimeType location;
private final String targetDataset;
private final String[] locations;
private final INodeDomain computationNodeDomain;
private final List<IAType> pkTypes;
private final List<ScalarFunctionCallExpression> keyAccessExpression;
private final FeedConnection feedConnection;
public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType,
List<IAType> pkTypes, List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId,
FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection)
throws AlgebricksException {
super(id, itemType, metaType, Type.FEED, domain);
this.feed = feed;
this.targetDataset = targetDataset;
this.sourceFeedId = sourceFeedId;
this.location = location;
this.locations = locations;
this.pkTypes = pkTypes;
this.keyAccessExpression = keyAccessExpression;
this.computationNodeDomain = domain;
this.feedConnection = feedConnection;
initFeedDataSource();
}
public Feed getFeed() {
return feed;
}
@Override
public IAType[] getSchemaTypes() {
return schemaTypes;
}
public String getTargetDataset() {
return targetDataset;
}
public EntityId getSourceFeedId() {
return sourceFeedId;
}
public FeedRuntimeType getLocation() {
return location;
}
public String[] getLocations() {
return locations;
}
private void initFeedDataSource() {
int i = 0;
// record + meta (if exists) + PKs (if exists)
schemaTypes = new IAType[(1 + (metaItemType != null ? 1 : 0) + (pkTypes != null ? pkTypes.size() : 0))];
schemaTypes[i++] = itemType;
if (metaItemType != null) {
schemaTypes[i++] = metaItemType;
}
if (pkTypes != null) {
for (IAType type : pkTypes) {
schemaTypes[i++] = type;
}
}
}
public List<IAType> getPkTypes() {
return pkTypes;
}
@Override
public List<ScalarFunctionCallExpression> getKeyAccessExpression() {
return keyAccessExpression;
}
@Override
public LogicalVariable getMetaVariable(List<LogicalVariable> dataScanVariables) {
return metaItemType == null ? null : dataScanVariables.get(1);
}
@Override
public LogicalVariable getDataRecordVariable(List<LogicalVariable> dataScanVariables) {
return dataScanVariables.get(0);
}
@Override
public boolean isChange() {
return pkTypes != null;
}
@Override
public List<LogicalVariable> getPkVars(List<LogicalVariable> allVars) {
if (pkTypes == null) {
return null;
}
if (metaItemType != null) {
return allVars.subList(2, allVars.size());
} else {
return allVars.subList(1, allVars.size());
}
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo projectionFiltrationInfo) throws AlgebricksException {
try {
if (tupleFilterFactory != null || outputLimit >= 0) {
throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
"Tuple filter and limit are not supported by FeedDataSource");
}
ARecordType feedOutputType = (ARecordType) itemType;
ISerializerDeserializer payloadSerde =
metadataProvider.getDataFormat().getSerdeProvider().getSerializerDeserializer(feedOutputType);
ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
serdes.add(payloadSerde);
if (metaItemType != null) {
serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaItemType));
}
if (pkTypes != null) {
for (IAType type : pkTypes) {
serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
}
}
RecordDescriptor feedDesc =
new RecordDescriptor(serdes.toArray(new ISerializerDeserializer[serdes.size()]));
FeedPolicyEntity feedPolicy =
(FeedPolicyEntity) getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
if (feedPolicy == null) {
throw new AlgebricksException("Feed not configured with a policy");
}
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDatabaseName(),
getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset());
FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation());
return new Pair<>(feedCollector, new AlgebricksAbsolutePartitionConstraint(getLocations()));
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
@Override
public boolean isScanAccessPathALeaf() {
return true;
}
public FeedConnection getFeedConnection() {
return feedConnection;
}
public INodeDomain getComputationNodeDomain() {
return computationNodeDomain;
}
}