blob: 4f07a7846626e907c9ef6d9e2d9bae5ae87908f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.input.stream.factory;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.INodeResolver;
import org.apache.asterix.external.api.INodeResolverFactory;
import org.apache.asterix.external.input.stream.LocalFSInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.asterix.external.util.NodeResolverFactory;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.io.UnmanagedFileSplit;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class LocalFSInputStreamFactory implements IInputStreamFactory {
private static final long serialVersionUID = 1L;
protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
protected static final Logger LOGGER = LogManager.getLogger();
protected static INodeResolver nodeResolver;
protected Map<String, String> configuration;
protected UnmanagedFileSplit[] inputFileSplits;
protected boolean isFeed;
protected String expression;
// transient fields (They don't need to be serialized and transferred)
private transient AlgebricksAbsolutePartitionConstraint constraints;
private transient FileSystemWatcher watcher;
@Override
public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
if (watcher == null) {
String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
ArrayList<Path> inputResources = new ArrayList<>();
for (int i = 0; i < inputFileSplits.length; i++) {
if (inputFileSplits[i].getNodeName().equals(nodeName)) {
inputResources.add(inputFileSplits[i].getFile().toPath());
}
}
watcher = new FileSystemWatcher(inputResources, expression, isFeed);
}
return new LocalFSInputStream(watcher);
}
@Override
public DataSourceType getDataSourceType() {
return DataSourceType.STREAM;
}
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AsterixException {
this.configuration = configuration;
String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
if (inputFileSplits == null) {
configureFileSplits((ICcApplicationContext) serviceCtx.getApplicationContext(), splits);
}
configurePartitionConstraint();
this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
}
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
return constraints;
}
private void configureFileSplits(ICcApplicationContext appCtx, String[] splits) throws AsterixException {
INodeResolver resolver = getNodeResolver();
Map<InetAddress, Set<String>> ncMap = RuntimeUtils.getForcedNodeControllerMap(appCtx);
Set<String> ncs = ncMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
inputFileSplits = new UnmanagedFileSplit[splits.length];
String node;
String path;
int count = 0;
String trimmedValue;
for (String splitPath : splits) {
trimmedValue = splitPath.trim();
if (!trimmedValue.contains("://")) {
throw new AsterixException(
"Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
}
node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0], ncMap, ncs);
path = trimmedValue.split("://")[1];
inputFileSplits[count++] = new UnmanagedFileSplit(node, path);
}
}
private void configurePartitionConstraint() throws AsterixException {
Set<String> locs = new TreeSet<>();
for (int i = 0; i < inputFileSplits.length; i++) {
locs.add(inputFileSplits[i].getNodeName());
}
constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
}
protected INodeResolver getNodeResolver() {
if (nodeResolver == null) {
synchronized (DEFAULT_NODE_RESOLVER) {
if (nodeResolver == null) {
nodeResolver = initializeNodeResolver();
}
}
}
return nodeResolver;
}
private static INodeResolver initializeNodeResolver() {
INodeResolver nodeResolver = null;
String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
if (configuredNodeResolverFactory != null) {
try {
nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
.createNodeResolver();
} catch (Exception e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Unable to create node resolver from the configured classname "
+ configuredNodeResolverFactory + "\n" + e.getMessage());
}
nodeResolver = DEFAULT_NODE_RESOLVER;
}
} else {
nodeResolver = DEFAULT_NODE_RESOLVER;
}
return nodeResolver;
}
}