/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.grpc;

import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.ClientAuth;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class TestInvokeGRPC {
    // ids placed on flowfiles and used to dictate response codes in the DummyFlowFileService below
    private static final long ERROR = 500;
    private static final long SUCCESS = 501;
    private static final long RETRY = 502;

    @Test
    public void testSuccess() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);
            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));

            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
            assertThat(successFiles.size(), equalTo(1));
            final MockFlowFile successFile = successFiles.get(0);
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testSuccessWithFlowFileContent() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);

            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));

            runner.enqueue("content");
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "content");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
            assertThat(successFiles.size(), equalTo(1));
            final MockFlowFile successFile = successFiles.get(0);
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "content");
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testSuccessAlwaysOutputResponse() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);

            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
            runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true");

            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
            assertThat(successFiles.size(), equalTo(1));
            final MockFlowFile successFile = successFiles.get(0);
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testExceedMaxMessageSize() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);
            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
            // set max message size to 1B to force error
            runner.setProperty(InvokeGRPC.PROP_MAX_MESSAGE_SIZE, "1B");

            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
            // an exception should be thrown indicating that the max message size was exceeded.
            response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, "io.grpc.StatusRuntimeException");
        } finally {
            server.stop();
        }
    }

    @Test
    public void testRetry() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);

            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));

            final MockFlowFile mockFlowFile = new MockFlowFile(RETRY);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
            runner.assertPenalizeCount(1);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testRetryAlwaysOutputResponse() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);
            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
            runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true");

            final MockFlowFile mockFlowFile = new MockFlowFile(RETRY);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
            runner.assertPenalizeCount(1);

            final List<MockFlowFile> retryFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY);
            assertThat(retryFiles.size(), equalTo(1));
            final MockFlowFile retry = retryFiles.get(0);
            retry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY));
            retry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
            retry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            retry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testNoRetryOnError() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);

            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));

            final MockFlowFile mockFlowFile = new MockFlowFile(ERROR);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testNoRetryOnErrorAlwaysOutputResponseAndPenalize() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);

            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
            runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true");
            runner.setProperty(InvokeGRPC.PROP_PENALIZE_NO_RETRY, "true");

            final MockFlowFile mockFlowFile = new MockFlowFile(ERROR);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
            runner.assertPenalizeCount(1);

            final List<MockFlowFile> noRetryFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY);
            assertThat(noRetryFiles.size(), equalTo(1));
            final MockFlowFile noRetry = noRetryFiles.get(0);
            noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR));
            noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
            noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testNoInput() throws Exception {
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);

        try {
            final int port = server.start(0);

            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));

            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
            runner.assertPenalizeCount(0);
        } finally {
            server.stop();
        }
    }

    @Test
    public void testServerConnectionFail() throws Exception {

        final int port = TestGRPCServer.randomPort();

        // should be no gRPC server running @ that port, so processor will fail
        final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
        runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
        runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, Integer.toString(port));

        final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
        runner.enqueue(mockFlowFile);
        runner.run();

        runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
        runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
        runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
        runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
        runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1);

        final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE);
        assertThat(responseFiles.size(), equalTo(1));
        final MockFlowFile response = responseFiles.get(0);
        response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
        response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, Integer.toString(port));
        response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, "io.grpc.StatusRuntimeException");

    }

    @Test
    public void testSecureTwoWaySsl() throws Exception {
        final Map<String, String> sslProperties = getKeystoreProperties();
        sslProperties.putAll(getTruststoreProperties());
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class, sslProperties);

        try {
            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            useSSLContextService(runner, sslProperties);
            final int port = server.start(0);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
            runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true");

            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
            assertThat(successFiles.size(), equalTo(1));
            final MockFlowFile successFile = successFiles.get(0);
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    @Test
    public void testSecureOneWaySsl() throws Exception {
        final Map<String, String> sslProperties = getKeystoreProperties();
        sslProperties.put(TestGRPCServer.NEED_CLIENT_AUTH, ClientAuth.NONE.name());
        final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class, sslProperties);

        try {
            final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
            useSSLContextService(runner, getTruststoreProperties());
            final int port = server.start(0);
            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
            runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true");

            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
            runner.enqueue(mockFlowFile);
            runner.run();
            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);

            final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
            assertThat(responseFiles.size(), equalTo(1));
            final MockFlowFile response = responseFiles.get(0);
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));

            final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
            assertThat(successFiles.size(), equalTo(1));
            final MockFlowFile successFile = successFiles.get(0);
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
        } finally {
            server.stop();
        }
    }

    private static Map<String, String> getTruststoreProperties() {
        final Map<String, String> props = new HashMap<>();
        props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
        props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "passwordpassword");
        props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
        return props;
    }

    private static Map<String, String> getKeystoreProperties() {
        final Map<String, String> properties = new HashMap<>();
        properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/keystore.jks");
        properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "passwordpassword");
        properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
        return properties;
    }

    private void useSSLContextService(final TestRunner controller, final Map<String, String> sslProperties) {
        final SSLContextService service = new StandardSSLContextService();
        try {
            controller.addControllerService("ssl-service", service, sslProperties);
            controller.enableControllerService(service);
        } catch (InitializationException ex) {
            ex.printStackTrace();
            Assert.fail("Could not create SSL Context Service");
        }

        controller.setProperty(InvokeGRPC.PROP_SSL_CONTEXT_SERVICE, "ssl-service");
    }

    /**
     * Dummy gRPC service whose responses are dictated by the IDs on the messages it receives
     */
    private static class DummyFlowFileService extends FlowFileServiceGrpc.FlowFileServiceImplBase {

        public DummyFlowFileService() {
        }

        @Override
        public void send(FlowFileRequest request, StreamObserver<FlowFileReply> responseObserver) {
            final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder();
            // use the id to dictate response codes
            final long id = request.getId();

            if (id == ERROR) {
                replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
                        .setBody("error");
            } else if (id == SUCCESS) {
                replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
                        .setBody("success");
            } else if (id == RETRY) {
                replyBuilder.setResponseCode(FlowFileReply.ResponseCode.RETRY)
                        .setBody("retry");
                // else, assume the request is to include the flowfile content in the response
            } else {
                replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
                        .setBody(request.getContent().toStringUtf8());

            }
            responseObserver.onNext(replyBuilder.build());
            responseObserver.onCompleted();
        }
    }
}
