blob: 79973fe147c53aac720dcf7a60c8416208d0a528 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.util.NiFiProperties;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
*/
public class StandardNiFiContentAccess implements ContentAccess {
public static final String CLIENT_ID_PARAM = "clientId";
private static final Pattern FLOWFILE_CONTENT_URI_PATTERN = Pattern
.compile("/flowfile-queues/([a-f0-9\\-]{36})/flowfiles/([a-f0-9\\-]{36})/content.*");
private static final Pattern PROVENANCE_CONTENT_URI_PATTERN = Pattern
.compile("/provenance-events/([0-9]+)/content/((?:input)|(?:output)).*");
private NiFiProperties properties;
private NiFiServiceFacade serviceFacade;
private ClusterCoordinator clusterCoordinator;
private RequestReplicator requestReplicator;
@Override
public DownloadableContent getContent(final ContentRequestContext request) {
// if clustered, send request to cluster manager
if (properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// get the URI
URI dataUri;
try {
dataUri = new URI(request.getDataUri());
} catch (final URISyntaxException use) {
throw new ClusterRequestException(use);
}
// set the request parameters
final MultivaluedMap<String, String> parameters = new MultivaluedHashMap();
parameters.add(CLIENT_ID_PARAM, request.getClientId());
// set the headers
final Map<String, String> headers = new HashMap<>();
// ensure we were able to detect the cluster node id
if (request.getClusterNodeId() == null) {
throw new IllegalArgumentException("Unable to determine the which node has the content.");
}
// get the target node and ensure it exists
final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(request.getClusterNodeId());
// replicate the request to the cluster coordinator, indicating the target node
NodeResponse nodeResponse;
try {
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
if (coordinatorNode == null) {
throw new NoClusterCoordinatorException();
}
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false, true).awaitMergedResponse();
} catch (InterruptedException e) {
throw new IllegalClusterStateException("Interrupted while waiting for a response from node");
}
final Response clientResponse = nodeResponse.getClientResponse();
final MultivaluedMap<String, String> responseHeaders = clientResponse.getStringHeaders();
// ensure an appropriate response
if (Response.Status.NOT_FOUND.getStatusCode() == clientResponse.getStatusInfo().getStatusCode()) {
throw new ResourceNotFoundException(clientResponse.readEntity(String.class));
} else if (Response.Status.FORBIDDEN.getStatusCode() == clientResponse.getStatusInfo().getStatusCode()
|| Response.Status.UNAUTHORIZED.getStatusCode() == clientResponse.getStatusInfo().getStatusCode()) {
throw new AccessDeniedException(clientResponse.readEntity(String.class));
} else if (Response.Status.OK.getStatusCode() != clientResponse.getStatusInfo().getStatusCode()) {
throw new IllegalStateException(clientResponse.readEntity(String.class));
}
// get the file name
final String contentDisposition = responseHeaders.getFirst("Content-Disposition");
final String filename = StringUtils.substringBetween(contentDisposition, "filename=\"", "\"");
// get the content type
final String contentType = responseHeaders.getFirst("Content-Type");
// create the downloadable content
return new DownloadableContent(filename, contentType, nodeResponse.getInputStream());
} else {
// example URIs:
// http://localhost:8080/nifi-api/provenance/events/{id}/content/{input|output}
// http://localhost:8080/nifi-api/flowfile-queues/{uuid}/flowfiles/{uuid}/content
// get just the context path for comparison
final String dataUri = StringUtils.substringAfter(request.getDataUri(), "/nifi-api");
if (StringUtils.isBlank(dataUri)) {
throw new IllegalArgumentException("The specified data reference URI is not valid.");
}
// flowfile listing content
final Matcher flowFileMatcher = FLOWFILE_CONTENT_URI_PATTERN.matcher(dataUri);
if (flowFileMatcher.matches()) {
final String connectionId = flowFileMatcher.group(1);
final String flowfileId = flowFileMatcher.group(2);
return getFlowFileContent(connectionId, flowfileId, dataUri);
}
// provenance event content
final Matcher provenanceMatcher = PROVENANCE_CONTENT_URI_PATTERN.matcher(dataUri);
if (provenanceMatcher.matches()) {
try {
final Long eventId = Long.parseLong(provenanceMatcher.group(1));
final ContentDirection direction = ContentDirection.valueOf(provenanceMatcher.group(2).toUpperCase());
return getProvenanceEventContent(eventId, dataUri, direction);
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException("The specified data reference URI is not valid.");
}
}
// invalid uri
throw new IllegalArgumentException("The specified data reference URI is not valid.");
}
}
private DownloadableContent getFlowFileContent(final String connectionId, final String flowfileId, final String dataUri) {
// user authorization is handled once we have the actual content so we can utilize the flow file attributes in the resource context
return serviceFacade.getContent(connectionId, flowfileId, dataUri);
}
private DownloadableContent getProvenanceEventContent(final Long eventId, final String dataUri, final ContentDirection direction) {
// user authorization is handled once we have the actual prov event so we can utilize the event attributes in the resource context
return serviceFacade.getContent(eventId, dataUri, direction);
}
public void setProperties(NiFiProperties properties) {
this.properties = properties;
}
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setRequestReplicator(RequestReplicator requestReplicator) {
this.requestReplicator = requestReplicator;
}
public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}
}