blob: 8c5a93b21376ec3e008aaef9005fa48568bc2770 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.api.http.server;
import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.metadata.Namespace;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.IFormattedException;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.hyracks.http.server.utils.HttpUtil;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.MixedAttribute;
public abstract class AbstractNCUdfServlet extends AbstractServlet {
INcApplicationContext appCtx;
INCServiceContext srvCtx;
protected final IApplicationContext plainAppCtx;
private final HttpScheme httpServerProtocol;
private final int httpServerPort;
public static final String GET_UDF_DIST_ENDPOINT = "/dist";
public static final String TYPE_PARAMETER = "type";
public static final String DATA_PARAMETER = "data";
public static final String NAME_KEY = "name";
public static final String DATAVERSE_KEY = "dataverse";
protected static final class LibraryUploadData {
final ExternalFunctionLanguage type;
final boolean replaceIfExists;
final FileUpload fileUpload;
private LibraryUploadData(MixedAttribute type, boolean replaceIfExists, InterfaceHttpData fileUpload)
throws IOException {
this.type = type != null ? getLanguageByTypeParameter(type.getValue()) : null;
this.replaceIfExists = replaceIfExists;
this.fileUpload = (FileUpload) fileUpload;
}
public static LibraryUploadData libraryCreationUploadData(MixedAttribute type, InterfaceHttpData fileUpload)
throws IOException {
//POST imples replaceIfExists
return new LibraryUploadData(type, true, fileUpload);
}
}
public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
HttpScheme httpServerProtocol, int httpServerPort) {
super(ctx, paths);
this.plainAppCtx = appCtx;
this.httpServerProtocol = httpServerProtocol;
this.httpServerPort = httpServerPort;
}
void readFromFile(Path filePath, IServletResponse response, String contentType, OpenOption opt) throws Exception {
class InputStreamGetter extends SynchronizableWork {
private InputStream is;
@Override
protected void doRun() throws Exception {
if (opt != null) {
is = Files.newInputStream(filePath, opt);
} else {
is = Files.newInputStream(filePath);
}
}
}
InputStreamGetter r = new InputStreamGetter();
((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r);
if (r.is == null) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return;
}
try {
response.setStatus(HttpResponseStatus.OK);
HttpUtil.setContentType(response, contentType);
IOUtils.copyLarge(r.is, response.outputStream());
} finally {
r.is.close();
}
}
protected String getDataverseKey() {
return DATAVERSE_KEY;
}
URI createDownloadURI(Path file) throws Exception {
String host = appCtx.getServiceContext().getAppConfig().getString(NCConfig.Option.PUBLIC_ADDRESS);
String path = paths[0].substring(0, servletPathLengths[0]) + GET_UDF_DIST_ENDPOINT + '/' + file.getFileName();
return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
}
private boolean isNotAttribute(InterfaceHttpData field) {
return field == null || !field.getHttpDataType().equals(InterfaceHttpData.HttpDataType.Attribute);
}
protected Pair<Namespace, String> decodeDvAndLibFromLocalPath(String localPath)
throws RuntimeDataException, AlgebricksException {
String[] pathSegments = StringUtils.split(localPath, '/');
if (pathSegments.length != 2) {
throw RuntimeDataException.create(ErrorCode.PARAMETERS_REQUIRED,
"The URL-encoded " + getDataverseKey() + " name and library name in the request path");
}
String namespaceStr = ServletUtil.decodeUriSegment(pathSegments[0]);
Namespace namespace = plainAppCtx.getNamespaceResolver().resolve(namespaceStr);
String libName = ServletUtil.decodeUriSegment(pathSegments[1]);
return new Pair<>(namespace, libName);
}
protected LibraryUploadData decodeMultiPartLibraryOptions(HttpPostRequestDecoder requestDecoder)
throws IOException {
InterfaceHttpData typeAttribute = requestDecoder.getBodyHttpData(TYPE_PARAMETER);
if (typeAttribute == null) {
throw RuntimeDataException.create(ErrorCode.PARAMETERS_REQUIRED, TYPE_PARAMETER);
}
else if (!isNotAttribute(typeAttribute)) {
InterfaceHttpData libraryData = requestDecoder.getBodyHttpData(DATA_PARAMETER);
if (libraryData == null) {
throw RuntimeDataException.create(ErrorCode.PARAMETERS_REQUIRED, DATA_PARAMETER);
} else if (!libraryData.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) {
throw RuntimeDataException.create(ErrorCode.INVALID_REQ_PARAM_VAL, DATA_PARAMETER,
libraryData.getHttpDataType());
}
LibraryUploadData uploadData =
LibraryUploadData.libraryCreationUploadData((MixedAttribute) typeAttribute, libraryData);
if (uploadData.type == null) {
throw RuntimeDataException.create(ErrorCode.LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_KIND,
((MixedAttribute) typeAttribute).getValue());
}
return uploadData;
} else {
throw RuntimeDataException.create(ErrorCode.PARAMETERS_REQUIRED, TYPE_PARAMETER);
}
}
static ExternalFunctionLanguage getLanguageByTypeParameter(String lang) {
if (lang.equalsIgnoreCase(JAVA.name())) {
return JAVA;
} else if (lang.equalsIgnoreCase(PYTHON.name())) {
return PYTHON;
} else {
return null;
}
}
HttpResponseStatus toHttpErrorStatus(Exception e) {
if (IFormattedException.matchesAny(e, ErrorCode.UNKNOWN_DATAVERSE, ErrorCode.UNKNOWN_LIBRARY)) {
return HttpResponseStatus.NOT_FOUND;
}
if (IFormattedException.matchesAny(e, ErrorCode.LIBRARY_EXTERNAL_FUNCTION_UNKNOWN_KIND,
ErrorCode.LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_KIND, ErrorCode.INVALID_REQ_PARAM_VAL,
ErrorCode.PARAMETERS_REQUIRED, ErrorCode.INVALID_DATABASE_OBJECT_NAME)) {
return HttpResponseStatus.BAD_REQUEST;
}
if (e instanceof AlgebricksException) {
return HttpResponseStatus.BAD_REQUEST;
}
return HttpResponseStatus.INTERNAL_SERVER_ERROR;
}
}