blob: 7a807a4c2dfdbb948b181a142c15f96d44ef6243 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.input.stream.factory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
/**
* Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
* simulates a twitter firehose with tweets being "pushed" into Asterix at a
* configurable rate measured in terms of TPS (tweets/second). The stream of
* tweets lasts for a configurable duration (measured in seconds).
*/
public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
private static final long serialVersionUID = 1L;
/**
* Degree of parallelism for feed ingestion activity. Defaults to 1. This
* determines the count constraint for the ingestion operator.
**/
private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
/**
* The absolute locations where ingestion operator instances will be placed.
**/
private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
private Map<String, String> configuration;
private transient IServiceContext serviceCtx;
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
String[] locations = null;
if (ingestionLocationParam != null) {
locations = ingestionLocationParam.split(",");
}
int count = locations != null ? locations.length : 1;
if (ingestionCardinalityParam != null) {
count = Integer.parseInt(ingestionCardinalityParam);
}
ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
List<String> chosenLocations = new ArrayList<>();
String[] availableLocations = locations != null ? locations
: appCtx.getClusterStateManager().getParticipantNodes().toArray(new String[] {});
for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
chosenLocations.add(availableLocations[k]);
}
return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
}
@Override
public DataSourceType getDataSourceType() {
return DataSourceType.STREAM;
}
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
this.configuration = configuration;
}
@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
try {
return new TwitterFirehoseInputStream(configuration, partition);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
}