| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.web.api; |
| |
| import org.apache.nifi.authorization.AuthorizableLookup; |
| import org.apache.nifi.authorization.resource.ResourceType; |
| import org.apache.nifi.remote.HttpRemoteSiteListener; |
| import org.apache.nifi.remote.Peer; |
| import org.apache.nifi.remote.PublicPort; |
| import org.apache.nifi.remote.VersionNegotiator; |
| import org.apache.nifi.remote.exception.HandshakeException; |
| import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; |
| import org.apache.nifi.remote.protocol.ResponseCode; |
| import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; |
| import org.apache.nifi.remote.protocol.http.HttpHeaders; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.apache.nifi.web.NiFiServiceFacade; |
| import org.apache.nifi.web.api.entity.TransactionResultEntity; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import javax.servlet.ServletContext; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.StreamingOutput; |
| import javax.ws.rs.core.UriBuilder; |
| import javax.ws.rs.core.UriInfo; |
| import java.io.InputStream; |
| import java.lang.reflect.Field; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| |
| import static org.apache.nifi.web.util.WebUtils.PROXY_HOST_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.PROXY_PORT_HTTP_HEADER; |
| import static org.apache.nifi.web.util.WebUtils.PROXY_SCHEME_HTTP_HEADER; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| public class TestDataTransferResource { |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| final URL resource = TestDataTransferResource.class.getResource("/site-to-site/nifi.properties"); |
| final String propertiesFile = resource.toURI().getPath(); |
| System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, propertiesFile); |
| } |
| |
| private HttpServletRequest createCommonHttpServletRequest() { |
| final HttpServletRequest req = mock(HttpServletRequest.class); |
| doReturn("1").when(req).getHeader(eq(HttpHeaders.PROTOCOL_VERSION)); |
| doReturn(new StringBuffer("http://nifi.example.com:8080") |
| .append("/nifi-api/data-transfer/output-ports/port-id/transactions/tx-id/flow-files")) |
| .when(req).getRequestURL(); |
| return req; |
| } |
| |
| @Test |
| public void testCreateTransactionPortNotFound() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null); |
| |
| doThrow(new HandshakeException(ResponseCode.UNKNOWN_PORT, "Not found.")).when(serverProtocol).handshake(any()); |
| |
| final ServletContext context = null; |
| final UriInfo uriInfo = null; |
| final InputStream inputStream = null; |
| |
| final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(404, response.getStatus()); |
| assertEquals(ResponseCode.UNKNOWN_PORT.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| @Test |
| public void testCreateTransactionPortNotInValidState() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null); |
| |
| doThrow(new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, "Not in valid state.")).when(serverProtocol).handshake(any()); |
| |
| final ServletContext context = null; |
| final UriInfo uriInfo = null; |
| final InputStream inputStream = null; |
| |
| final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(503, response.getStatus()); |
| assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| @Test |
| public void testCreateTransactionUnauthorized() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null); |
| |
| doThrow(new HandshakeException(ResponseCode.UNAUTHORIZED, "Unauthorized.")).when(serverProtocol).handshake(any()); |
| |
| final ServletContext context = null; |
| final UriInfo uriInfo = null; |
| final InputStream inputStream = null; |
| |
| final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(401, response.getStatus()); |
| assertEquals(ResponseCode.UNAUTHORIZED.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| private UriInfo mockUriInfo(final String locationUriStr) throws URISyntaxException { |
| final UriInfo uriInfo = mock(UriInfo.class); |
| final UriBuilder uriBuilder = mock(UriBuilder.class); |
| |
| final URI locationUri = new URI(locationUriStr); |
| doReturn(uriBuilder).when(uriInfo).getBaseUriBuilder(); |
| doReturn(uriBuilder).when(uriBuilder).path(any(String.class)); |
| doReturn(uriBuilder).when(uriBuilder).segment(any(String.class)); |
| doReturn(locationUri).when(uriBuilder).build(); |
| return uriInfo; |
| } |
| |
| @Test |
| public void testCreateTransaction() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| |
| final String locationUriStr = "http://localhost:8080/nifi-api/data-transfer/input-ports/port-id/transactions/transaction-id"; |
| |
| final ServletContext context = null; |
| final UriInfo uriInfo = mockUriInfo(locationUriStr); |
| final Field uriInfoField = resource.getClass().getSuperclass().getSuperclass() |
| .getDeclaredField("uriInfo"); |
| uriInfoField.setAccessible(true); |
| uriInfoField.set(resource, uriInfo); |
| |
| final HttpServletRequest request = mock(HttpServletRequest.class); |
| when(request.getServerPort()).thenReturn(8080); |
| when(request.getScheme()).thenReturn("http"); |
| when(request.getHeader(eq("Host"))).thenReturn("localhost"); |
| |
| final Field httpServletRequestField = resource.getClass().getSuperclass().getSuperclass() |
| .getDeclaredField("httpServletRequest"); |
| httpServletRequestField.setAccessible(true); |
| httpServletRequestField.set(resource, request); |
| |
| final InputStream inputStream = null; |
| |
| final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(201, response.getStatus()); |
| assertEquals(ResponseCode.PROPERTIES_OK.getCode(), resultEntity.getResponseCode()); |
| assertEquals(locationUriStr, response.getMetadata().getFirst(HttpHeaders.LOCATION_HEADER_NAME).toString()); |
| } |
| |
| @Test |
| public void testCreateTransactionThroughReverseProxy() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| |
| final String locationUriStr = "https://nifi2.example.com:443/nifi-api/data-transfer/input-ports/port-id/transactions/transaction-id"; |
| |
| final ServletContext context = null; |
| final UriInfo uriInfo = mockUriInfo(locationUriStr); |
| final Field uriInfoField = resource.getClass().getSuperclass().getSuperclass() |
| .getDeclaredField("uriInfo"); |
| uriInfoField.setAccessible(true); |
| uriInfoField.set(resource, uriInfo); |
| |
| final HttpServletRequest request = mock(HttpServletRequest.class); |
| when(request.getHeader(PROXY_SCHEME_HTTP_HEADER)).thenReturn("https"); |
| when(request.getHeader(PROXY_HOST_HTTP_HEADER)).thenReturn("nifi2.example.com"); |
| when(request.getHeader(PROXY_PORT_HTTP_HEADER)).thenReturn("443"); |
| final Field httpServletRequestField = resource.getClass().getSuperclass().getSuperclass() |
| .getDeclaredField("httpServletRequest"); |
| httpServletRequestField.setAccessible(true); |
| httpServletRequestField.set(resource, request); |
| |
| final InputStream inputStream = null; |
| |
| final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(201, response.getStatus()); |
| assertEquals(ResponseCode.PROPERTIES_OK.getCode(), resultEntity.getResponseCode()); |
| assertEquals(locationUriStr, response.getMetadata().getFirst(HttpHeaders.LOCATION_HEADER_NAME).toString()); |
| } |
| |
| @Test |
| public void testExtendTransaction() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| |
| final String locationUriStr = "http://localhost:8080/nifi-api/data-transfer/input-ports/port-id/transactions/transaction-id"; |
| |
| final ServletContext context = null; |
| final HttpServletResponse res = null; |
| final UriInfo uriInfo = mockUriInfo(locationUriStr); |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.extendPortTransactionTTL("input-ports", "port-id", transactionId, req, res, context, uriInfo, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(200, response.getStatus()); |
| assertEquals(ResponseCode.CONTINUE_TRANSACTION.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| @Test |
| public void testReceiveFlowFiles() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null); |
| |
| final PublicPort port = mock(PublicPort.class); |
| doReturn(port).when(serverProtocol).getPort(); |
| doAnswer(invocation -> { |
| Peer peer = (Peer) invocation.getArguments()[0]; |
| ((HttpServerCommunicationsSession)peer.getCommunicationsSession()).setChecksum("server-checksum"); |
| return 7; |
| }).when(port).receiveFlowFiles(any(Peer.class), any()); |
| |
| final ServletContext context = null; |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.receiveFlowFiles("port-id", transactionId, req, context, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| final Object entity = response.getEntity(); |
| |
| assertEquals(202, response.getStatus()); |
| assertEquals("server-checksum", entity); |
| } |
| |
| @Test |
| public void testReceiveZeroFlowFiles() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null); |
| |
| final PublicPort port = mock(PublicPort.class); |
| doReturn(port).when(serverProtocol).getPort(); |
| doAnswer(invocation -> 0).when(port).receiveFlowFiles(any(Peer.class), any()); |
| |
| final ServletContext context = null; |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.receiveFlowFiles("port-id", transactionId, req, context, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| assertEquals(400, response.getStatus()); |
| } |
| |
| @Test |
| public void testCommitInputPortTransaction() { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| |
| final ServletContext context = null; |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.commitInputPortTransaction(ResponseCode.CONFIRM_TRANSACTION.getCode(), "port-id", transactionId, req, context, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(200, response.getStatus()); |
| assertEquals(ResponseCode.CONFIRM_TRANSACTION.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| @Test |
| public void testTransferFlowFiles() { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| |
| final ServletContext context = null; |
| final HttpServletResponse res = null; |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.transferFlowFiles("port-id", transactionId, req, res, context, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| final Object entity = response.getEntity(); |
| |
| assertEquals(202, response.getStatus()); |
| assertTrue(entity instanceof StreamingOutput); |
| } |
| |
| @Test |
| public void testCommitOutputPortTransaction() { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| |
| final ServletContext context = null; |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.commitOutputPortTransaction(ResponseCode.CONFIRM_TRANSACTION.getCode(), |
| "client-checksum", "port-id", transactionId, req, context, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(200, response.getStatus()); |
| assertEquals(ResponseCode.CONFIRM_TRANSACTION.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| @Test |
| public void testCommitOutputPortTransactionBadChecksum() throws Exception { |
| final HttpServletRequest req = createCommonHttpServletRequest(); |
| |
| final DataTransferResource resource = getDataTransferResource(); |
| final HttpFlowFileServerProtocol serverProtocol = resource.getHttpFlowFileServerProtocol(null); |
| |
| doThrow(new HandshakeException(ResponseCode.BAD_CHECKSUM, "Bad checksum.")).when(serverProtocol).commitTransferTransaction(any(), any()); |
| |
| final ServletContext context = null; |
| final InputStream inputStream = null; |
| |
| final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null)); |
| final String transactionId = transactionManager.createTransaction(); |
| |
| final Response response = resource.commitOutputPortTransaction(ResponseCode.CONFIRM_TRANSACTION.getCode(), |
| "client-checksum", "port-id", transactionId, req, context, inputStream); |
| |
| transactionManager.cancelTransaction(transactionId); |
| |
| TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); |
| |
| assertEquals(400, response.getStatus()); |
| assertEquals(ResponseCode.BAD_CHECKSUM.getCode(), resultEntity.getResponseCode()); |
| } |
| |
| private DataTransferResource getDataTransferResource() { |
| final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class); |
| |
| final HttpFlowFileServerProtocol serverProtocol = mock(HttpFlowFileServerProtocol.class); |
| final DataTransferResource resource = new DataTransferResource(NiFiProperties.createBasicNiFiProperties(null)) { |
| @Override |
| protected void authorizeDataTransfer(AuthorizableLookup lookup, ResourceType resourceType, String identifier) { |
| } |
| |
| @Override |
| HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) { |
| return serverProtocol; |
| } |
| }; |
| resource.setProperties(NiFiProperties.createBasicNiFiProperties(null)); |
| resource.setServiceFacade(serviceFacade); |
| return resource; |
| } |
| } |