| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.web.api; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.authorization.AuthorizableLookup; |
| import org.apache.nifi.authorization.AuthorizeAccess; |
| import org.apache.nifi.authorization.AuthorizeControllerServiceReference; |
| import org.apache.nifi.authorization.AuthorizeParameterReference; |
| import org.apache.nifi.authorization.Authorizer; |
| import org.apache.nifi.authorization.ComponentAuthorizable; |
| import org.apache.nifi.authorization.ProcessGroupAuthorizable; |
| import org.apache.nifi.authorization.RequestAction; |
| import org.apache.nifi.authorization.SnippetAuthorizable; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.user.NiFiUser; |
| import org.apache.nifi.authorization.user.NiFiUserUtils; |
| import org.apache.nifi.cluster.coordination.ClusterCoordinator; |
| import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; |
| import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; |
| import org.apache.nifi.cluster.manager.NodeResponse; |
| import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; |
| import org.apache.nifi.cluster.manager.exception.UnknownNodeException; |
| import org.apache.nifi.cluster.protocol.NodeIdentifier; |
| import org.apache.nifi.controller.FlowController; |
| import org.apache.nifi.remote.HttpRemoteSiteListener; |
| import org.apache.nifi.remote.VersionNegotiator; |
| import org.apache.nifi.remote.exception.BadRequestException; |
| import org.apache.nifi.remote.exception.HandshakeException; |
| import org.apache.nifi.remote.exception.NotAuthorizedException; |
| import org.apache.nifi.remote.protocol.ResponseCode; |
| import org.apache.nifi.remote.protocol.http.HttpHeaders; |
| import org.apache.nifi.util.ComponentIdGenerator; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.apache.nifi.web.NiFiServiceFacade; |
| import org.apache.nifi.web.Revision; |
| import org.apache.nifi.web.api.dto.RevisionDTO; |
| import org.apache.nifi.web.api.entity.ComponentEntity; |
| import org.apache.nifi.web.api.entity.Entity; |
| import org.apache.nifi.web.api.entity.TransactionResultEntity; |
| import org.apache.nifi.web.security.ProxiedEntitiesUtils; |
| import org.apache.nifi.web.security.jwt.NiFiBearerTokenResolver; |
| import org.apache.nifi.web.security.util.CacheKey; |
| import org.apache.nifi.web.util.WebUtils; |
| import org.eclipse.jetty.http.HttpCookie; |
| import org.eclipse.jetty.http.HttpHeader; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.servlet.ServletContext; |
| import javax.servlet.http.Cookie; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import javax.ws.rs.core.CacheControl; |
| import javax.ws.rs.core.Context; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.MultivaluedHashMap; |
| import javax.ws.rs.core.MultivaluedMap; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.Response.ResponseBuilder; |
| import javax.ws.rs.core.UriBuilder; |
| import javax.ws.rs.core.UriBuilderException; |
| import javax.ws.rs.core.UriInfo; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiFunction; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| |
| import static javax.ws.rs.core.Response.Status.NOT_FOUND; |
| import static org.apache.commons.lang3.StringUtils.isEmpty; |
| import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; |
| import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; |
| import static org.apache.nifi.web.util.WebUtils.FORWARDED_HOST_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.FORWARDED_PORT_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.FORWARDED_PROTO_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.PROXY_HOST_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.PROXY_PORT_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.PROXY_SCHEME_HTTP_HEADER; |
| |
| /** |
| * Base class for controllers. |
| */ |
| public abstract class ApplicationResource { |
| |
| public static final String VERSION = "version"; |
| public static final String CLIENT_ID = "clientId"; |
| public static final String DISCONNECTED_NODE_ACKNOWLEDGED = "disconnectedNodeAcknowledged"; |
| static final String LOGIN_ERROR_TITLE = "Unable to continue login sequence"; |
| static final String LOGOUT_ERROR_TITLE = "Unable to continue logout sequence"; |
| private static final int VALID_FOR_SESSION_ONLY = -1; |
| |
| protected static final String NON_GUARANTEED_ENDPOINT = "Note: This endpoint is subject to change as NiFi and it's REST API evolve."; |
| |
| private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class); |
| |
| public static final String NODEWISE = "false"; |
| |
| @Context |
| protected HttpServletRequest httpServletRequest; |
| |
| @Context |
| protected UriInfo uriInfo; |
| |
| protected NiFiProperties properties; |
| private RequestReplicator requestReplicator; |
| private ClusterCoordinator clusterCoordinator; |
| private FlowController flowController; |
| |
| private static final int MAX_CACHE_SOFT_LIMIT = 500; |
| private final Cache<CacheKey, Request<? extends Entity>> twoPhaseCommitCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); |
| |
| protected void forwardToLoginMessagePage(final HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse, final String message) throws Exception { |
| forwardToMessagePage(httpServletRequest, httpServletResponse, LOGIN_ERROR_TITLE, message); |
| } |
| |
| protected void forwardToLogoutMessagePage(final HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse, final String message) throws Exception { |
| forwardToMessagePage(httpServletRequest, httpServletResponse, LOGOUT_ERROR_TITLE, message); |
| } |
| |
| protected void forwardToMessagePage(final HttpServletRequest httpServletRequest, final HttpServletResponse httpServletResponse, |
| final String title, final String message) throws Exception { |
| httpServletRequest.setAttribute("title", title); |
| httpServletRequest.setAttribute("messages", message); |
| |
| final ServletContext uiContext = httpServletRequest.getServletContext().getContext("/nifi"); |
| uiContext.getRequestDispatcher("/WEB-INF/pages/message-page.jsp").forward(httpServletRequest, httpServletResponse); |
| } |
| |
| /** |
| * Generate a resource uri based off of the specified parameters. |
| * |
| * @param path path |
| * @return resource uri |
| */ |
| protected String generateResourceUri(final String... path) { |
| URI uri = buildResourceUri(path); |
| return uri.toString(); |
| } |
| |
| private URI buildResourceUri(final String... path) { |
| final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); |
| uriBuilder.segment(path); |
| URI uri = uriBuilder.build(); |
| try { |
| |
| // check for proxy settings |
| |
| final String scheme = getFirstHeaderValue(PROXY_SCHEME_HTTP_HEADER, FORWARDED_PROTO_HTTP_HEADER); |
| final String hostHeaderValue = getFirstHeaderValue(PROXY_HOST_HTTP_HEADER, FORWARDED_HOST_HTTP_HEADER); |
| final String portHeaderValue = getFirstHeaderValue(PROXY_PORT_HTTP_HEADER, FORWARDED_PORT_HTTP_HEADER); |
| |
| final String host = WebUtils.determineProxiedHost(hostHeaderValue); |
| final String port = WebUtils.determineProxiedPort(hostHeaderValue, portHeaderValue); |
| |
| // Catch header poisoning |
| String allowedContextPaths = properties.getAllowedContextPaths(); |
| String resourcePath = WebUtils.getResourcePath(uri, httpServletRequest, allowedContextPaths); |
| |
| // determine the port uri |
| int uriPort = uri.getPort(); |
| if (port != null) { |
| if (StringUtils.isWhitespace(port)) { |
| uriPort = -1; |
| } else { |
| try { |
| uriPort = Integer.parseInt(port); |
| } catch (final NumberFormatException nfe) { |
| logger.warn(String.format("Unable to parse proxy port HTTP header '%s'. Using port from request URI '%s'.", port, uriPort)); |
| } |
| } |
| } |
| |
| // construct the URI |
| uri = new URI( |
| (StringUtils.isBlank(scheme)) ? uri.getScheme() : scheme, |
| uri.getUserInfo(), |
| (StringUtils.isBlank(host)) ? uri.getHost() : host, |
| uriPort, |
| resourcePath, |
| uri.getQuery(), |
| uri.getFragment()); |
| |
| } catch (final URISyntaxException use) { |
| throw new UriBuilderException(use); |
| } |
| return uri; |
| } |
| |
| /** |
| * Edit the response headers to indicating no caching. |
| * |
| * @param response response |
| * @return builder |
| */ |
| protected ResponseBuilder noCache(final ResponseBuilder response) { |
| final CacheControl cacheControl = new CacheControl(); |
| cacheControl.setPrivate(true); |
| cacheControl.setNoCache(true); |
| cacheControl.setNoStore(true); |
| return response.cacheControl(cacheControl); |
| } |
| |
| protected String generateUuid() { |
| final Optional<String> seed = getIdGenerationSeed(); |
| UUID uuid; |
| if (seed.isPresent()) { |
| try { |
| UUID seedId = UUID.fromString(seed.get()); |
| uuid = new UUID(seedId.getMostSignificantBits(), seed.get().hashCode()); |
| } catch (Exception e) { |
| logger.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation."); |
| uuid = UUID.nameUUIDFromBytes(seed.get().getBytes(StandardCharsets.UTF_8)); |
| } |
| } else { |
| uuid = ComponentIdGenerator.generateId(); |
| } |
| |
| return uuid.toString(); |
| } |
| |
| protected Optional<String> getIdGenerationSeed() { |
| final String idGenerationSeed = httpServletRequest.getHeader(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER); |
| if (StringUtils.isBlank(idGenerationSeed)) { |
| return Optional.empty(); |
| } |
| |
| return Optional.of(idGenerationSeed); |
| } |
| |
| /** |
| * Generates an Ok response with no content. |
| * |
| * @return an Ok response with no content |
| */ |
| protected ResponseBuilder generateOkResponse() { |
| return noCache(Response.ok()); |
| } |
| |
| /** |
| * Generates an Ok response with the specified content. |
| * |
| * @param entity The entity |
| * @return The response to be built |
| */ |
| protected ResponseBuilder generateOkResponse(final Object entity) { |
| final ResponseBuilder response = Response.ok(entity); |
| return noCache(response); |
| } |
| |
| /** |
| * Generates a 201 Created response with the specified content. |
| * |
| * @param uri The URI |
| * @param entity entity |
| * @return The response to be built |
| */ |
| protected ResponseBuilder generateCreatedResponse(final URI uri, final Object entity) { |
| // generate the response builder |
| return Response.created(uri).entity(entity); |
| } |
| |
| /** |
| * Generates a 401 Not Authorized response with no content. |
| * |
| * @return The response to be built |
| */ |
| protected ResponseBuilder generateNotAuthorizedResponse() { |
| // generate the response builder |
| return Response.status(HttpServletResponse.SC_UNAUTHORIZED); |
| } |
| |
| /** |
| * Generates a 202 Accepted (Node Continue) response to be used within the cluster request handshake. |
| * |
| * @return a 202 Accepted (Node Continue) response to be used within the cluster request handshake |
| */ |
| protected ResponseBuilder generateContinueResponse() { |
| return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE); |
| } |
| |
| protected URI getAbsolutePath() { |
| return uriInfo.getAbsolutePath(); |
| } |
| |
| protected URI getRequestUri() { |
| return uriInfo.getRequestUri(); |
| } |
| |
| protected MultivaluedMap<String, String> getRequestParameters() { |
| final MultivaluedMap<String, String> entity = new MultivaluedHashMap(); |
| |
| for (final Map.Entry<String, String[]> entry : httpServletRequest.getParameterMap().entrySet()) { |
| if (entry.getValue() == null) { |
| entity.add(entry.getKey(), null); |
| } else { |
| for (final String aValue : entry.getValue()) { |
| entity.add(entry.getKey(), aValue); |
| } |
| } |
| } |
| |
| return entity; |
| } |
| |
| protected Map<String, String> getHeaders() { |
| return getHeaders(new HashMap<String, String>()); |
| } |
| |
| protected Map<String, String> getHeaders(final Map<String, String> overriddenHeaders) { |
| |
| final Map<String, String> result = new HashMap<>(); |
| final Map<String, String> overriddenHeadersIgnoreCaseMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); |
| if (overriddenHeaders != null) { |
| overriddenHeadersIgnoreCaseMap.putAll(overriddenHeaders); |
| } |
| |
| final Enumeration<String> headerNames = httpServletRequest.getHeaderNames(); |
| while (headerNames.hasMoreElements()) { |
| |
| final String headerName = headerNames.nextElement(); |
| if (!overriddenHeadersIgnoreCaseMap.isEmpty() && headerName.equalsIgnoreCase("content-length")) { |
| continue; |
| } |
| if (overriddenHeadersIgnoreCaseMap.containsKey(headerName)) { |
| result.put(headerName, overriddenHeadersIgnoreCaseMap.get(headerName)); |
| } else { |
| result.put(headerName, httpServletRequest.getHeader(headerName)); |
| } |
| } |
| |
| // if the scheme is not set by the client, include the details from this request but don't override |
| final String proxyScheme = getFirstHeaderValue(PROXY_SCHEME_HTTP_HEADER, FORWARDED_PROTO_HTTP_HEADER); |
| if (proxyScheme == null) { |
| result.put(PROXY_SCHEME_HTTP_HEADER, httpServletRequest.getScheme()); |
| } |
| |
| // if the host is not set by the client, include the details from this request but don't override |
| final String proxyHost = getFirstHeaderValue(PROXY_HOST_HTTP_HEADER, FORWARDED_HOST_HTTP_HEADER); |
| if (proxyHost == null) { |
| result.put(PROXY_HOST_HTTP_HEADER, httpServletRequest.getServerName()); |
| } |
| |
| // if the port is not set by the client, include the details from this request but don't override |
| final String proxyPort = getFirstHeaderValue(PROXY_PORT_HTTP_HEADER, FORWARDED_PORT_HTTP_HEADER); |
| if (proxyPort == null) { |
| result.put(PROXY_PORT_HTTP_HEADER, String.valueOf(httpServletRequest.getServerPort())); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Returns the value for the first key discovered when inspecting the current request. Will |
| * return null if there are no keys specified or if none of the specified keys are found. |
| * |
| * @param keys http header keys |
| * @return the value for the first key found |
| */ |
| private String getFirstHeaderValue(final String... keys) { |
| return WebUtils.getFirstHeaderValue(httpServletRequest, keys); |
| } |
| |
| /** |
| * Checks whether the request is part of a two-phase commit style request (either phase 1 or phase 2) |
| * |
| * @param httpServletRequest the request |
| * @return <code>true</code> if the request represents a two-phase commit style request |
| */ |
| protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) { |
| final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); |
| return transactionId != null && isConnectedToCluster(); |
| } |
| |
| /** |
| * When a two-phase commit style request is used, the first phase (generally referred to |
| * as the "commit-request stage") is intended to validate that the request can be completed. |
| * In NiFi, we use this phase to validate that the request can complete. This method determines |
| * whether or not the request is the first phase of a two-phase commit. |
| * |
| * @param httpServletRequest the request |
| * @return <code>true</code> if the request represents a two-phase commit style request and is the |
| * first of the two phases. |
| */ |
| protected boolean isValidationPhase(final HttpServletRequest httpServletRequest) { |
| return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null; |
| } |
| |
| protected boolean isExecutionPhase(final HttpServletRequest httpServletRequest) { |
| return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER) != null; |
| } |
| |
| protected boolean isCancellationPhase(final HttpServletRequest httpServletRequest) { |
| return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER) != null; |
| } |
| |
| /** |
| * Checks whether or not the request should be replicated to the cluster |
| * |
| * @return <code>true</code> if the request should be replicated, <code>false</code> otherwise |
| */ |
| boolean isReplicateRequest() { |
| // If not a node in a cluster, we do not replicate |
| if (!properties.isNode()) { |
| return false; |
| } |
| |
| ensureFlowInitialized(); |
| |
| // If not connected to the cluster, we do not replicate |
| if (!isConnectedToCluster()) { |
| return false; |
| } |
| |
| // Check if the X-Request-Replicated header is set. If so, the request has already been replicated, |
| // so we need to service the request locally. If not, then replicate the request to the entire cluster. |
| final String header = httpServletRequest.getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER); |
| return header == null; |
| } |
| |
| /** |
| * Converts a Revision DTO and an associated Component ID into a Revision object |
| * |
| * @param revisionDto the Revision DTO |
| * @param componentId the ID of the component that the Revision DTO belongs to |
| * @return a Revision that has the same client ID and Version as the Revision DTO and the Component ID specified |
| */ |
| protected Revision getRevision(final RevisionDTO revisionDto, final String componentId) { |
| return new Revision(revisionDto.getVersion(), revisionDto.getClientId(), componentId); |
| } |
| |
| /** |
| * Extracts a Revision object from the Revision DTO and ID provided by the Component Entity |
| * |
| * @param entity the ComponentEntity that contains the Revision DTO & ID |
| * @return the Revision specified in the ComponentEntity |
| */ |
| protected Revision getRevision(final ComponentEntity entity, final String componentId) { |
| return getRevision(entity.getRevision(), componentId); |
| } |
| |
| /** |
| * Authorize any restrictions for the specified ComponentAuthorizable. |
| * |
| * @param authorizer authorizer |
| * @param authorizable component authorizable |
| */ |
| protected void authorizeRestrictions(final Authorizer authorizer, final ComponentAuthorizable authorizable) { |
| authorizable.getRestrictedAuthorizables().forEach(restrictionAuthorizable -> restrictionAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())); |
| } |
| |
| /** |
| * Authorizes the specified process group. |
| * |
| * @param processGroupAuthorizable process group |
| * @param authorizer authorizer |
| * @param lookup lookup |
| * @param action action |
| * @param authorizeReferencedServices whether to authorize referenced services |
| * @param authorizeTemplates whether to authorize templates |
| * @param authorizeControllerServices whether to authorize controller services |
| * @param authorizeTransitiveServices whether to authorize transitive services |
| * @param authorizeParameterReferences whether to authorize parameter references |
| */ |
| protected void authorizeProcessGroup(final ProcessGroupAuthorizable processGroupAuthorizable, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action, |
| final boolean authorizeReferencedServices, final boolean authorizeTemplates, |
| final boolean authorizeControllerServices, final boolean authorizeTransitiveServices, |
| final boolean authorizeParameterReferences) { |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action, user); |
| |
| // authorize the process group |
| authorize.accept(processGroupAuthorizable.getAuthorizable()); |
| |
| // authorize the contents of the group - these methods return all encapsulated components (recursive) |
| processGroupAuthorizable.getEncapsulatedProcessors().forEach(processorAuthorizable -> { |
| // authorize the processor |
| authorize.accept(processorAuthorizable.getAuthorizable()); |
| |
| // authorize any referenced services if necessary |
| if (authorizeReferencedServices) { |
| AuthorizeControllerServiceReference.authorizeControllerServiceReferences(processorAuthorizable, authorizer, lookup, authorizeTransitiveServices); |
| } |
| |
| // authorize any referenced parameters if necessary |
| if (authorizeParameterReferences) { |
| AuthorizeParameterReference.authorizeParameterReferences(processorAuthorizable, authorizer, processorAuthorizable.getParameterContext(), user); |
| } |
| }); |
| processGroupAuthorizable.getEncapsulatedConnections().stream().map(connection -> connection.getAuthorizable()).forEach(authorize); |
| processGroupAuthorizable.getEncapsulatedInputPorts().forEach(authorize); |
| processGroupAuthorizable.getEncapsulatedOutputPorts().forEach(authorize); |
| processGroupAuthorizable.getEncapsulatedFunnels().forEach(authorize); |
| processGroupAuthorizable.getEncapsulatedLabels().forEach(authorize); |
| processGroupAuthorizable.getEncapsulatedProcessGroups().stream().map(group -> group.getAuthorizable()).forEach(authorize); |
| processGroupAuthorizable.getEncapsulatedRemoteProcessGroups().forEach(authorize); |
| |
| // authorize templates if necessary |
| if (authorizeTemplates) { |
| processGroupAuthorizable.getEncapsulatedTemplates().forEach(authorize); |
| } |
| |
| // authorize controller services if necessary |
| if (authorizeControllerServices) { |
| processGroupAuthorizable.getEncapsulatedControllerServices().forEach(controllerServiceAuthorizable -> { |
| // authorize the controller service |
| authorize.accept(controllerServiceAuthorizable.getAuthorizable()); |
| |
| // authorize any referenced services if necessary |
| if (authorizeReferencedServices) { |
| AuthorizeControllerServiceReference.authorizeControllerServiceReferences(controllerServiceAuthorizable, authorizer, lookup, authorizeTransitiveServices); |
| } |
| |
| if (authorizeParameterReferences) { |
| AuthorizeParameterReference.authorizeParameterReferences(controllerServiceAuthorizable, authorizer, controllerServiceAuthorizable.getParameterContext(), user); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Authorizes the specified Snippet with the specified request action. |
| * |
| * @param authorizer authorizer |
| * @param lookup lookup |
| * @param action action |
| */ |
| protected void authorizeSnippet(final SnippetAuthorizable snippet, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action, |
| final boolean authorizeReferencedServices, final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences) { |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action, user); |
| |
| // authorize each component in the specified snippet |
| snippet.getSelectedProcessGroups().forEach(processGroupAuthorizable -> { |
| // note - we are not authorizing templates or controller services as they are not considered when using this snippet. however, |
| // referenced services are considered so those are explicitly authorized when authorizing a processor |
| authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, action, authorizeReferencedServices, |
| false, false, authorizeTransitiveServices, authorizeParameterReferences); |
| }); |
| snippet.getSelectedRemoteProcessGroups().forEach(authorize); |
| snippet.getSelectedProcessors().forEach(processorAuthorizable -> { |
| // authorize the processor |
| authorize.accept(processorAuthorizable.getAuthorizable()); |
| |
| // authorize any referenced services if necessary |
| if (authorizeReferencedServices) { |
| AuthorizeControllerServiceReference.authorizeControllerServiceReferences(processorAuthorizable, authorizer, lookup, authorizeTransitiveServices); |
| } |
| |
| // authorize any parameter usage |
| if (authorizeParameterReferences) { |
| AuthorizeParameterReference.authorizeParameterReferences(processorAuthorizable, authorizer, processorAuthorizable.getParameterContext(), user); |
| } |
| }); |
| snippet.getSelectedInputPorts().forEach(authorize); |
| snippet.getSelectedOutputPorts().forEach(authorize); |
| snippet.getSelectedConnections().forEach(connAuth -> authorize.accept(connAuth.getAuthorizable())); |
| snippet.getSelectedFunnels().forEach(authorize); |
| snippet.getSelectedLabels().forEach(authorize); |
| } |
| |
| /** |
| * Executes an action through the service facade using the specified revision. |
| * |
| * @param serviceFacade service facade |
| * @param revision revision |
| * @param authorizer authorizer |
| * @param verifier verifier |
| * @param action executor |
| * @return the response |
| */ |
| protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Revision revision, final AuthorizeAccess authorizer, |
| final Runnable verifier, final BiFunction<Revision, T, Response> action) { |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| if (isTwoPhaseRequest(httpServletRequest)) { |
| if (isValidationPhase(httpServletRequest)) { |
| // authorize access |
| serviceFacade.authorizeAccess(authorizer); |
| serviceFacade.verifyRevision(revision, user); |
| |
| // verify if necessary |
| if (verifier != null) { |
| verifier.run(); |
| } |
| |
| // store the request |
| phaseOneStoreTransaction(entity, revision, null); |
| |
| return generateContinueResponse().build(); |
| } else if (isExecutionPhase(httpServletRequest)) { |
| // get the original request and run the action |
| final Request<T> phaseOneRequest = phaseTwoVerifyTransaction(); |
| return action.apply(phaseOneRequest.getRevision(), phaseOneRequest.getRequest()); |
| } else if (isCancellationPhase(httpServletRequest)) { |
| cancelTransaction(); |
| return generateOkResponse().build(); |
| } else { |
| throw new IllegalStateException("This request does not appear to be part of the two phase commit."); |
| } |
| } else { |
| // authorize access and run the action |
| serviceFacade.authorizeAccess(authorizer); |
| serviceFacade.verifyRevision(revision, user); |
| |
| // verify if necessary |
| if (verifier != null) { |
| verifier.run(); |
| } |
| |
| return action.apply(revision, entity); |
| } |
| } |
| |
| /** |
| * Executes an action through the service facade using the specified revision. |
| * |
| * @param serviceFacade service facade |
| * @param revisions revisions |
| * @param authorizer authorizer |
| * @param verifier verifier |
| * @param action executor |
| * @return the response |
| */ |
| protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Set<Revision> revisions, final AuthorizeAccess authorizer, |
| final Runnable verifier, final BiFunction<Set<Revision>, T, Response> action) { |
| |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| |
| if (isTwoPhaseRequest(httpServletRequest)) { |
| if (isValidationPhase(httpServletRequest)) { |
| // authorize access |
| serviceFacade.authorizeAccess(authorizer); |
| serviceFacade.verifyRevisions(revisions, user); |
| |
| // verify if necessary |
| if (verifier != null) { |
| verifier.run(); |
| } |
| |
| // store the request |
| phaseOneStoreTransaction(entity, null, revisions); |
| |
| return generateContinueResponse().build(); |
| } else if (isExecutionPhase(httpServletRequest)) { |
| // get the original request and run the action |
| final Request<T> phaseOneRequest = phaseTwoVerifyTransaction(); |
| return action.apply(phaseOneRequest.getRevisions(), phaseOneRequest.getRequest()); |
| } else if (isCancellationPhase(httpServletRequest)) { |
| cancelTransaction(); |
| return generateOkResponse().build(); |
| } else { |
| throw new IllegalStateException("This request does not appear to be part of the two phase commit."); |
| } |
| } else { |
| // authorize access and run the action |
| serviceFacade.authorizeAccess(authorizer); |
| serviceFacade.verifyRevisions(revisions, user); |
| |
| // verify if necessary |
| if (verifier != null) { |
| verifier.run(); |
| } |
| |
| return action.apply(revisions, entity); |
| } |
| } |
| |
| /** |
| * Executes an action through the service facade. |
| * |
| * @param serviceFacade service facade |
| * @param authorizer authorizer |
| * @param verifier verifier |
| * @param action the action to execute |
| * @return the response |
| */ |
| protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final AuthorizeAccess authorizer, |
| final Runnable verifier, final Function<T, Response> action) { |
| |
| if (isTwoPhaseRequest(httpServletRequest)) { |
| if (isValidationPhase(httpServletRequest)) { |
| // authorize access |
| serviceFacade.authorizeAccess(authorizer); |
| |
| // verify if necessary |
| if (verifier != null) { |
| verifier.run(); |
| } |
| |
| // store the request |
| phaseOneStoreTransaction(entity, null, null); |
| |
| return generateContinueResponse().build(); |
| } else if (isExecutionPhase(httpServletRequest)) { |
| // get the original request and run the action |
| final Request<T> phaseOneRequest = phaseTwoVerifyTransaction(); |
| return action.apply(phaseOneRequest.getRequest()); |
| } else if (isCancellationPhase(httpServletRequest)) { |
| cancelTransaction(); |
| return generateOkResponse().build(); |
| } else { |
| throw new IllegalStateException("This request does not appear to be part of the two phase commit."); |
| } |
| } else { |
| // authorize access |
| serviceFacade.authorizeAccess(authorizer); |
| |
| // verify if necessary |
| if (verifier != null) { |
| verifier.run(); |
| } |
| |
| // run the action |
| return action.apply(entity); |
| } |
| } |
| |
| private <T extends Entity> void phaseOneStoreTransaction(final T requestEntity, final Revision revision, final Set<Revision> revisions) { |
| if (twoPhaseCommitCache.size() > MAX_CACHE_SOFT_LIMIT) { |
| throw new IllegalStateException("The maximum number of requests are in progress."); |
| } |
| |
| // get the transaction id |
| final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); |
| if (StringUtils.isBlank(transactionId)) { |
| throw new IllegalArgumentException("Two phase commit Transaction Id missing."); |
| } |
| |
| synchronized (twoPhaseCommitCache) { |
| final CacheKey key = new CacheKey(transactionId); |
| if (twoPhaseCommitCache.getIfPresent(key) != null) { |
| throw new IllegalStateException("Transaction " + transactionId + " is already in progress."); |
| } |
| |
| // store the entry for the second phase |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| final Request<T> request = new Request<>(ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user), getAbsolutePath().toString(), revision, revisions, requestEntity); |
| twoPhaseCommitCache.put(key, request); |
| } |
| } |
| |
| private <T extends Entity> Request<T> phaseTwoVerifyTransaction() { |
| // get the transaction id |
| final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); |
| if (StringUtils.isBlank(transactionId)) { |
| throw new IllegalArgumentException("Two phase commit Transaction Id missing."); |
| } |
| |
| // get the entry for the second phase |
| final Request<T> request; |
| synchronized (twoPhaseCommitCache) { |
| final CacheKey key = new CacheKey(transactionId); |
| request = (Request<T>) twoPhaseCommitCache.getIfPresent(key); |
| if (request == null) { |
| throw new IllegalArgumentException("The request from phase one is missing."); |
| } |
| |
| twoPhaseCommitCache.invalidate(key); |
| } |
| final String phaseOneChain = request.getUserChain(); |
| |
| // build the chain for the current request |
| final NiFiUser user = NiFiUserUtils.getNiFiUser(); |
| final String phaseTwoChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user); |
| |
| if (phaseOneChain == null || !phaseOneChain.equals(phaseTwoChain)) { |
| throw new IllegalArgumentException("The same user must issue the request for phase one and two."); |
| } |
| |
| final String phaseOneUri = request.getUri(); |
| if (phaseOneUri == null || !phaseOneUri.equals(getAbsolutePath().toString())) { |
| throw new IllegalArgumentException("The URI must be the same for phase one and two."); |
| } |
| |
| return request; |
| } |
| |
| private void cancelTransaction() { |
| // get the transaction id |
| final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); |
| if (StringUtils.isBlank(transactionId)) { |
| throw new IllegalArgumentException("Two phase commit Transaction Id missing."); |
| } |
| |
| synchronized (twoPhaseCommitCache) { |
| final CacheKey key = new CacheKey(transactionId); |
| twoPhaseCommitCache.invalidate(key); |
| } |
| } |
| |
| private final class Request<T extends Entity> { |
| final String userChain; |
| final String uri; |
| final Revision revision; |
| final Set<Revision> revisions; |
| final T request; |
| |
| public Request(String userChain, String uri, Revision revision, Set<Revision> revisions, T request) { |
| this.userChain = userChain; |
| this.uri = uri; |
| this.revision = revision; |
| this.revisions = revisions; |
| this.request = request; |
| } |
| |
| public String getUserChain() { |
| return userChain; |
| } |
| |
| public String getUri() { |
| return uri; |
| } |
| |
| public Revision getRevision() { |
| return revision; |
| } |
| |
| public Set<Revision> getRevisions() { |
| return revisions; |
| } |
| |
| public T getRequest() { |
| return request; |
| } |
| } |
| |
| /** |
| * Replicates the request to the given node |
| * |
| * @param method the HTTP method |
| * @param nodeUuid the UUID of the node to replicate the request to |
| * @return the response from the node |
| * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster |
| */ |
| protected Response replicate(final String method, final String nodeUuid) { |
| return replicate(method, getRequestParameters(), nodeUuid); |
| } |
| |
| private void ensureFlowInitialized() { |
| if (!flowController.isInitialized()) { |
| throw new IllegalClusterStateException("The Flow Controller is initializing the Data Flow."); |
| } |
| } |
| |
| protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) { |
| final URI path = getAbsolutePath(); |
| return replicate(path, method, entity, nodeUuid, headersToOverride); |
| } |
| |
| /** |
| * Replicates the request to the given node |
| * |
| * @param method the HTTP method |
| * @param entity the Entity to replicate |
| * @param nodeUuid the UUID of the node to replicate the request to |
| * @return the response from the node |
| * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster |
| */ |
| protected Response replicate(final String method, final Object entity, final String nodeUuid) { |
| return replicate(method, entity, nodeUuid, null); |
| } |
| |
| /** |
| * Replicates the request to the given node |
| * |
| * @param method the HTTP method |
| * @param entity the Entity to replicate |
| * @param nodeUuid the UUID of the node to replicate the request to |
| * @return the response from the node |
| * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster |
| */ |
| protected Response replicate(final URI path, final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) { |
| // since we're cluster we must specify the cluster node identifier |
| if (nodeUuid == null) { |
| throw new IllegalArgumentException("The cluster node identifier must be specified."); |
| } |
| |
| final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeUuid); |
| if (nodeId == null) { |
| throw new UnknownNodeException("Cannot replicate request " + method + " " + getAbsolutePath() + " to node with ID " + nodeUuid + " because the specified node does not exist."); |
| } |
| |
| ensureFlowInitialized(); |
| |
| try { |
| final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride); |
| |
| // Determine if we should replicate to the node directly or if we should replicate to the Cluster Coordinator, |
| // and have it replicate the request on our behalf. |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| // If we are to replicate directly to the nodes, we need to indicate that the replication source is |
| // the cluster coordinator so that the node knows to service the request. |
| final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId); |
| return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse(); |
| } else { |
| headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId()); |
| return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse().getResponse(); |
| } |
| } catch (final InterruptedException ie) { |
| return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build(); |
| } |
| } |
| |
| protected NodeIdentifier getClusterCoordinatorNode() { |
| final NodeIdentifier activeClusterCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode(); |
| if (activeClusterCoordinator != null) { |
| return activeClusterCoordinator; |
| } |
| |
| throw new NoClusterCoordinatorException(); |
| } |
| |
| protected ReplicationTarget getReplicationTarget() { |
| return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; |
| } |
| |
| |
| protected Response replicate(final String method, final NodeIdentifier targetNode) { |
| return replicate(method, targetNode, getRequestParameters()); |
| } |
| |
| protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) { |
| ensureFlowInitialized(); |
| |
| try { |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly |
| // to the cluster nodes themselves. |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode); |
| return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse(); |
| } else { |
| final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId())); |
| return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, getAbsolutePath(), entity, headers).awaitMergedResponse().getResponse(); |
| } |
| } catch (final InterruptedException ie) { |
| return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); |
| } |
| } |
| |
| protected Response replicateToCoordinator(final String method, final Object entity) { |
| ensureFlowInitialized(); |
| |
| try { |
| final NodeIdentifier coordinatorNode = getClusterCoordinatorNode(); |
| final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode); |
| return getRequestReplicator().replicate(coordinatorNodes, method, getAbsolutePath(), entity, getHeaders(), true, false).awaitMergedResponse().getResponse(); |
| } catch (final InterruptedException ie) { |
| return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); |
| } |
| } |
| |
| /** |
| * Convenience method for calling {@link #replicate(String, Object)} with an entity of |
| * {@link #getRequestParameters() getRequestParameters(true)} |
| * |
| * @param method the HTTP method to use |
| * @return the response from the request |
| */ |
| protected Response replicate(final String method) { |
| return replicate(method, getRequestParameters()); |
| } |
| |
| /** |
| * Convenience method for calling {@link #replicateNodeResponse(String, Object, Map)} with an entity of |
| * {@link #getRequestParameters() getRequestParameters(true)} and overriding no headers |
| * |
| * @param method the HTTP method to use |
| * @return the response from the request |
| * @throws InterruptedException if interrupted while replicating the request |
| */ |
| protected NodeResponse replicateNodeResponse(final String method) throws InterruptedException { |
| return replicateNodeResponse(method, getRequestParameters(), null); |
| } |
| |
| /** |
| * Replicates the request to all nodes in the cluster using the provided method and entity. The headers |
| * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be |
| * that provided by the {@link #getAbsolutePath()} method |
| * |
| * @param method the HTTP method to use |
| * @param entity the entity to replicate |
| * @return the response from the request |
| */ |
| protected Response replicate(final String method, final Object entity) { |
| return replicate(method, entity, (Map<String, String>) null); |
| } |
| |
| /** |
| * Replicates the request to all nodes in the cluster using the provided method and entity. The headers |
| * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be |
| * that provided by the {@link #getAbsolutePath()} method |
| * |
| * @param method the HTTP method to use |
| * @param entity the entity to replicate |
| * @param headersToOverride the headers to override |
| * @return the response from the request |
| * @see #replicateNodeResponse(String, Object, Map) |
| */ |
| protected Response replicate(final String method, final Object entity, final Map<String, String> headersToOverride) { |
| try { |
| return replicateNodeResponse(method, entity, headersToOverride).getResponse(); |
| } catch (final InterruptedException ie) { |
| return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); |
| } |
| } |
| |
| |
| protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException { |
| final URI path = getAbsolutePath(); |
| return replicateNodeResponse(path, method, entity, headersToOverride); |
| } |
| |
| /** |
| * Replicates the request to all nodes in the cluster using the provided method and entity. The headers |
| * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be |
| * that provided by the {@link #getAbsolutePath()} method. This method returns the NodeResponse, |
| * rather than a Response object. |
| * |
| * @param method the HTTP method to use |
| * @param entity the entity to replicate |
| * @param headersToOverride the headers to override |
| * @return the response from the request |
| * @throws InterruptedException if interrupted while replicating the request |
| * @see #replicate(String, Object, Map) |
| */ |
| protected NodeResponse replicateNodeResponse(final URI path, final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException { |
| ensureFlowInitialized(); |
| |
| final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride); |
| |
| // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly |
| // to the cluster nodes themselves. |
| final long replicateStart = System.nanoTime(); |
| String action = null; |
| try { |
| if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { |
| action = "Replicate Request " + method + " " + path; |
| return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); |
| } else { |
| action = "Forward Request " + method + " " + path + " to Coordinator"; |
| return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse(); |
| } |
| } finally { |
| final long replicateNanos = System.nanoTime() - replicateStart; |
| final String transactionId = headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); |
| final String requestId = transactionId == null ? "Request with no ID" : transactionId; |
| logger.debug("Took a total of {} millis to {} for {}", TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId); |
| } |
| } |
| |
| /** |
| * @return <code>true</code> if connected to a cluster, <code>false</code> |
| * if running in standalone mode or disconnected from cluster |
| */ |
| boolean isConnectedToCluster() { |
| return isClustered() && clusterCoordinator.isConnected(); |
| } |
| |
| boolean isClustered() { |
| return clusterCoordinator != null; |
| } |
| |
| boolean isDisconnectedFromCluster() { |
| return isClustered() && !clusterCoordinator.isConnected(); |
| } |
| |
| void verifyDisconnectedNodeModification(final Boolean disconnectionAcknowledged) { |
| if (!Boolean.TRUE.equals(disconnectionAcknowledged)) { |
| throw new IllegalArgumentException("This node is disconnected from its configured cluster. The requested change " |
| + "will only be allowed if the flag to acknowledge the disconnected node is set."); |
| } |
| } |
| |
| public void setRequestReplicator(final RequestReplicator requestReplicator) { |
| this.requestReplicator = requestReplicator; |
| } |
| |
| protected RequestReplicator getRequestReplicator() { |
| ensureFlowInitialized(); |
| |
| return requestReplicator; |
| } |
| |
| public void setProperties(final NiFiProperties properties) { |
| this.properties = properties; |
| } |
| |
| public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) { |
| this.clusterCoordinator = clusterCoordinator; |
| } |
| |
| protected ClusterCoordinator getClusterCoordinator() { |
| return clusterCoordinator; |
| } |
| |
| public void setFlowController(final FlowController flowController) { |
| this.flowController = flowController; |
| } |
| |
| protected NiFiProperties getProperties() { |
| return properties; |
| } |
| |
| public enum ReplicationTarget { |
| CLUSTER_NODES, CLUSTER_COORDINATOR |
| } |
| |
| // ----------------- |
| // HTTP site to site |
| // ----------------- |
| |
| protected Integer negotiateTransportProtocolVersion(final HttpServletRequest req, final VersionNegotiator transportProtocolVersionNegotiator) throws BadRequestException { |
| String protocolVersionStr = req.getHeader(HttpHeaders.PROTOCOL_VERSION); |
| if (isEmpty(protocolVersionStr)) { |
| throw new BadRequestException("Protocol version was not specified."); |
| } |
| |
| final Integer requestedProtocolVersion; |
| try { |
| requestedProtocolVersion = Integer.valueOf(protocolVersionStr); |
| } catch (NumberFormatException e) { |
| throw new BadRequestException("Specified protocol version was not in a valid number format: " + protocolVersionStr); |
| } |
| |
| Integer protocolVersion; |
| if (transportProtocolVersionNegotiator.isVersionSupported(requestedProtocolVersion)) { |
| return requestedProtocolVersion; |
| } else { |
| protocolVersion = transportProtocolVersionNegotiator.getPreferredVersion(requestedProtocolVersion); |
| } |
| |
| if (protocolVersion == null) { |
| throw new BadRequestException("Specified protocol version is not supported: " + protocolVersionStr); |
| } |
| return protocolVersion; |
| } |
| |
| protected Response.ResponseBuilder setCommonHeaders(final Response.ResponseBuilder builder, final Integer transportProtocolVersion, final HttpRemoteSiteListener transactionManager) { |
| return builder.header(HttpHeaders.PROTOCOL_VERSION, transportProtocolVersion) |
| .header(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, transactionManager.getTransactionTtlSec()); |
| } |
| |
| protected class ResponseCreator { |
| |
| public Response nodeTypeErrorResponse(String errMsg) { |
| return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity(errMsg).build(); |
| } |
| |
| public Response httpSiteToSiteIsNotEnabledResponse() { |
| return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity("HTTP(S) Site-to-Site is not enabled on this host.").build(); |
| } |
| |
| public Response wrongPortTypeResponse(String portType, String portId) { |
| logger.debug("Port type was wrong. portType={}, portId={}", portType, portId); |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(ResponseCode.ABORT.getCode()); |
| entity.setMessage("Port was not found."); |
| entity.setFlowFileSent(0); |
| return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); |
| } |
| |
| public Response transactionNotFoundResponse(String portId, String transactionId) { |
| logger.debug("Transaction was not found. portId={}, transactionId={}", portId, transactionId); |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(ResponseCode.ABORT.getCode()); |
| entity.setMessage("Transaction was not found."); |
| entity.setFlowFileSent(0); |
| return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); |
| } |
| |
| public Response unexpectedErrorResponse(String portId, Exception e) { |
| logger.error("Unexpected exception occurred. portId={}", portId); |
| logger.error("Exception detail:", e); |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(ResponseCode.ABORT.getCode()); |
| entity.setMessage("Server encountered an exception."); |
| entity.setFlowFileSent(0); |
| return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); |
| } |
| |
| public Response unexpectedErrorResponse(String portId, String transactionId, Exception e) { |
| logger.error("Unexpected exception occurred. portId={}, transactionId={}", portId, transactionId); |
| logger.error("Exception detail:", e); |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(ResponseCode.ABORT.getCode()); |
| entity.setMessage("Server encountered an exception."); |
| entity.setFlowFileSent(0); |
| return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); |
| } |
| |
| public Response unauthorizedResponse(NotAuthorizedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Client request was not authorized. {}", e.getMessage()); |
| } |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(ResponseCode.UNAUTHORIZED.getCode()); |
| entity.setMessage(e.getMessage()); |
| entity.setFlowFileSent(0); |
| return Response.status(Response.Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON_TYPE).entity(e.getMessage()).build(); |
| } |
| |
| public Response badRequestResponse(Exception e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Client sent a bad request. {}", e.getMessage()); |
| } |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(ResponseCode.ABORT.getCode()); |
| entity.setMessage(e.getMessage()); |
| entity.setFlowFileSent(0); |
| return Response.status(Response.Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build(); |
| } |
| |
| public Response handshakeExceptionResponse(HandshakeException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Handshake failed, {}", e.getMessage()); |
| } |
| ResponseCode handshakeRes = e.getResponseCode(); |
| Response.Status statusCd; |
| TransactionResultEntity entity = new TransactionResultEntity(); |
| entity.setResponseCode(handshakeRes != null ? handshakeRes.getCode() : ResponseCode.ABORT.getCode()); |
| entity.setMessage(e.getMessage()); |
| entity.setFlowFileSent(0); |
| switch (handshakeRes) { |
| case PORT_NOT_IN_VALID_STATE: |
| case PORTS_DESTINATION_FULL: |
| return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build(); |
| case UNAUTHORIZED: |
| statusCd = Response.Status.UNAUTHORIZED; |
| break; |
| case UNKNOWN_PORT: |
| statusCd = NOT_FOUND; |
| break; |
| default: |
| statusCd = Response.Status.BAD_REQUEST; |
| } |
| return Response.status(statusCd).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build(); |
| } |
| |
| public Response acceptedResponse(final HttpRemoteSiteListener transactionManager, final Object entity, final Integer protocolVersion) { |
| return noCache(setCommonHeaders(Response.status(Response.Status.ACCEPTED), protocolVersion, transactionManager)) |
| .entity(entity).build(); |
| } |
| |
| public Response locationResponse(UriInfo uriInfo, String portType, String portId, String transactionId, Object entity, |
| Integer protocolVersion, final HttpRemoteSiteListener transactionManager) { |
| |
| final URI transactionUri = buildResourceUri("data-transfer", portType, portId, "transactions", transactionId); |
| |
| return noCache(setCommonHeaders(Response.created(transactionUri), protocolVersion, transactionManager) |
| .header(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE)) |
| .entity(entity).build(); |
| } |
| |
| } |
| |
| protected Response generateTokenResponse(ResponseBuilder builder, String token) { |
| // currently there is no way to use javax.servlet-api to set SameSite=Strict, so we do this using Jetty |
| HttpCookie jwtCookie = new HttpCookie(NiFiBearerTokenResolver.JWT_COOKIE_NAME, token, null, "/", VALID_FOR_SESSION_ONLY, true, true, null, 0, HttpCookie.SameSite.STRICT); |
| return builder.header(HttpHeader.SET_COOKIE.asString(), jwtCookie.getRFC6265SetCookie()).build(); |
| } |
| |
| protected void removeCookie(final HttpServletResponse httpServletResponse, final String cookieName) { |
| final Cookie cookie = new Cookie(cookieName, null); |
| cookie.setPath("/"); |
| cookie.setHttpOnly(true); |
| cookie.setMaxAge(0); |
| cookie.setSecure(true); |
| httpServletResponse.addCookie(cookie); |
| } |
| |
| protected String getNiFiUri() { |
| final String nifiApiUrl = generateResourceUri(); |
| final String baseUrl = StringUtils.substringBeforeLast(nifiApiUrl, "/nifi-api"); |
| // Note: if the URL does not end with a / then Jetty will end up doing a redirect which can cause |
| // a problem when being behind a proxy b/c Jetty's redirect doesn't consider proxy headers |
| return baseUrl + "/nifi/"; |
| } |
| } |