// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <google/protobuf/any.pb.h>
#include <gtest/gtest.h>

#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/subprocess/echo_subprocess.h"
#include "kudu/subprocess/server.h"
#include "kudu/subprocess/subprocess.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DECLARE_int32(subprocess_request_queue_size_bytes);
DECLARE_int32(subprocess_response_queue_size_bytes);
DECLARE_int32(subprocess_num_responder_threads);
DECLARE_int32(subprocess_timeout_secs);

using google::protobuf::Any;
using std::make_shared;
using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace subprocess {

namespace {

// Creates a subprocess request with the given payload and ms to sleep.
SubprocessRequestPB CreateEchoSubprocessRequestPB(const string& payload,
                                                  int sleep_ms = 0) {
  SubprocessRequestPB request;
  EchoRequestPB echo_request;
  echo_request.set_data(payload);
  echo_request.set_sleep_ms(sleep_ms);
  unique_ptr<Any> any(new Any);
  any->PackFrom(echo_request);
  request.set_allocated_request(any.release());
  return request;
}

Status CheckMessage(const SubprocessResponsePB& resp, const string& expected_msg) {
  EchoResponsePB echo_resp;
  if (!resp.response().UnpackTo(&echo_resp)) {
    return Status::Corruption(Substitute("Failed to unpack echo response: $0",
                                         pb_util::SecureDebugString(resp)));
  }
  if (expected_msg != echo_resp.data()) {
    return Status::Corruption(Substitute("Expected: '$0', got: '$1'",
                                         expected_msg, echo_resp.data()));
  }
  return Status::OK();
}

const char* kHello = "hello world";

} // anonymous namespace

class SubprocessServerTest : public KuduTest {
 public:
  SubprocessServerTest()
      : test_dir_(GetTestDataDirectory()),
        metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
                                                        "subprocess_server-test")) {}
  void SetUp() override {
    KuduTest::SetUp();
    ASSERT_OK(ResetSubprocessServer());
  }

  Status InitSubprocessServer(int java_queue_size,
                              int java_parser_threads,
                              shared_ptr<SubprocessServer>* out) {
    // Set up a subprocess server pointing at the kudu-subprocess.jar that
    // contains an echo handler and call EchoSubprocessMain.
    string exe;
    RETURN_NOT_OK(env_->GetExecutablePath(&exe));
    const string bin_dir = DirName(exe);
    string java_home;
    RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
    const string pipe_path = SubprocessServer::FifoPath(JoinPathSegments(test_dir_, "echo_pipe"));
    vector<string> argv = {
      Substitute("$0/bin/java", java_home),
      "-cp", Substitute("$0/kudu-subprocess.jar", bin_dir),
      "org.apache.kudu.subprocess.echo.EchoSubprocessMain",
      "-o", pipe_path,
    };
    if (java_queue_size > 0) {
      argv.emplace_back("-q");
      argv.emplace_back(std::to_string(java_queue_size));
    }
    if (java_parser_threads > 0) {
      argv.emplace_back("-p");
      argv.emplace_back(std::to_string(java_parser_threads));
    }
    *out = make_shared<SubprocessServer>(env_, pipe_path, std::move(argv),
                                         EchoSubprocessMetrics(metric_entity_));
    return (*out)->Init();
  }

  // Resets the subprocess server to account for any new configuration.
  Status ResetSubprocessServer(int java_queue_size = 0,
                               int java_parser_threads = 0) {
    return InitSubprocessServer(java_queue_size, java_parser_threads, &server_);
  }

 protected:
  const string test_dir_;
  MetricRegistry metric_registry_;
  scoped_refptr<MetricEntity> metric_entity_;
  shared_ptr<SubprocessServer> server_;
};

TEST_F(SubprocessServerTest, TestBasicCall) {
  SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
  SubprocessResponsePB response;
  ASSERT_OK(server_->Execute(&request, &response));
  EchoResponsePB echo_response;
  ASSERT_TRUE(response.response().UnpackTo(&echo_response));
  ASSERT_EQ(echo_response.data(), kHello);
}

// Test sending many requests concurrently.
TEST_F(SubprocessServerTest, TestManyConcurrentCalls) {
  constexpr int kNumThreads = 20;
  constexpr int kNumPerThread = 200;
  const string kEchoDataPrefix = "Do the hokey pokey, turn yourself around!";
  vector<vector<SubprocessRequestPB>> requests(kNumThreads,
      vector<SubprocessRequestPB>(kNumPerThread));
  vector<vector<SubprocessResponsePB>> responses(kNumThreads,
      vector<SubprocessResponsePB>(kNumPerThread));
  for (int t = 0; t < kNumThreads; t++) {
    for (int i = 0; i < kNumPerThread; i++) {
      requests[t][i] = CreateEchoSubprocessRequestPB(
          Substitute("$0: thread $1 idx $2", kEchoDataPrefix, t, i));
    }
  }
  Stopwatch sw(Stopwatch::ALL_THREADS);
  sw.start();
  {
    vector<thread> threads;
    SCOPED_CLEANUP({
      for (auto& t : threads) {
        t.join();
      }
    });
    for (int t = 0; t < kNumThreads; t++) {
      threads.emplace_back([&, t] {
        for (int i = 0; i < kNumPerThread; i++) {
          ASSERT_OK(server_->Execute(&requests[t][i], &responses[t][i]));
        }
      });
    }
  }
  sw.stop();
  double reqs_sent = kNumThreads * kNumPerThread;
  double elapsed_seconds = sw.elapsed().wall_seconds();
  LOG(INFO) << Substitute("Sent $0 requests in $1 seconds: $2 req/s",
                          reqs_sent, elapsed_seconds, reqs_sent / elapsed_seconds);
  for (int t = 0; t < kNumThreads; t++) {
    for (int i = 0; i < kNumPerThread; i++) {
      EchoRequestPB echo_req;
      requests[t][i].request().UnpackTo(&echo_req);
      ASSERT_OK(CheckMessage(responses[t][i], echo_req.data()));
    }
  }
}

// Test when our timeout occurs before adding the call to the outbound queue.
TEST_F(SubprocessServerTest, TestTimeoutBeforeQueueing) {
  FLAGS_subprocess_timeout_secs = 0;
  ASSERT_OK(ResetSubprocessServer());

  SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
  SubprocessResponsePB response;
  Status s = server_->Execute(&request, &response);
  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "couldn't enqueue call");
}

// Test when we try sending too many requests at once.
TEST_F(SubprocessServerTest, TestTimeoutWhileQueueingCalls) {
  // Set a relatively low timeout so our calls timeout more easily.
  FLAGS_subprocess_timeout_secs = 1;

  // Set a really low queue size so we can overflow the outbound queue easily.
  FLAGS_subprocess_request_queue_size_bytes = 1;

  // Make the Java subprocess single-threaded so we can overwhelm it more
  // easily.
  ASSERT_OK(ResetSubprocessServer(/*java_queue_size*/1,
                                  /*java_parser_threads*/1));

  // Send a bunch of requests with a sleep that's lower than our timeout.
  // Since we've made the subprocess single-threaded, the sheer number of
  // requests should fill the pipe, and our outbound queue.
  const int kNumRequests = 500;
  vector<thread> threads;
  vector<Status> results(kNumRequests);
  const string kLargeRequest = string(10000, 'x');
  for (int i = 0; i < kNumRequests; i++) {
    threads.emplace_back([&, i] {
      SubprocessRequestPB request =
          CreateEchoSubprocessRequestPB(kLargeRequest, /*sleep_ms*/500);
      SubprocessResponsePB response;
      results[i] = server_->Execute(&request, &response);
    });
  }
  for (auto& t : threads) {
    t.join();
  }
  bool has_timeout_when_queueing = false;
  for (const auto& s : results) {
    if (s.IsTimedOut() &&
        s.ToString().find("couldn't enqueue call") != string::npos) {
      has_timeout_when_queueing = true;
    }
  }
  // We sent a ton of requests and should've overwhelmed the pipe and our
  // outbound queue.
  ASSERT_TRUE(has_timeout_when_queueing) << "expected at least one timeout";
}

// Test when the subprocess takes too long.
TEST_F(SubprocessServerTest, TestSlowSubprocessTimesOut) {
  FLAGS_subprocess_timeout_secs = 1;
  ASSERT_OK(ResetSubprocessServer());
  SubprocessRequestPB request =
      CreateEchoSubprocessRequestPB(kHello, /*sleep_ms*/1500);
  SubprocessResponsePB response;
  Status s = server_->Execute(&request, &response);
  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "timed out while in flight");
}

// Test calls while shutting down.
TEST_F(SubprocessServerTest, TestCallsReturnWhenShuttingDown) {
  Status s;
  thread t([&] {
    SubprocessRequestPB request =
        CreateEchoSubprocessRequestPB(kHello, /*sleep_ms*/1000);
    SubprocessResponsePB response;
    s = server_->Execute(&request, &response);
  });
  server_->Shutdown();
  t.join();
  // There are many places the error could've happened, so we won't check on
  // the exact message or error type.
  ASSERT_FALSE(s.ok());
}

// Some usage of a subprocess warrants calling Init() from a short-lived
// thread. Let's ensure there's no funny business when that happens (e.g.
// ensure the OS doesn't reap the underlying process when the parent thread
// exits).
TEST_F(SubprocessServerTest, TestInitFromThread) {
  Status s;
  thread t([&] {
    s = ResetSubprocessServer();
  });
  t.join();
  ASSERT_OK(s);
  // Wait a bit to give time for the OS to wreak havoc (though it shouldn't).
  SleepFor(MonoDelta::FromSeconds(3));
  SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
  SubprocessResponsePB response;
  ASSERT_OK(server_->Execute(&request, &response));
}

// Test that we've configured out subprocess server such that we can run it
// from multiple threads without having them collide with each other.
TEST_F(SubprocessServerTest, TestRunFromMultipleThreads) {
  const int kNumThreads = 3;
  vector<thread> threads;
  vector<Status> results(kNumThreads);
#define EXIT_NOT_OK(s, n) do { \
  Status _s = (s); \
  if (!_s.ok()) { \
    results[n] = _s; \
    return; \
  } \
} while (0);

  for (int i = 0; i < kNumThreads; i++) {
    threads.emplace_back([&, i] {
      shared_ptr<SubprocessServer> server;
      EXIT_NOT_OK(InitSubprocessServer(0, 0, &server), i);
      const string msg = Substitute("$0 bottles of tea on the wall", i);
      SubprocessRequestPB req = CreateEchoSubprocessRequestPB(msg);
      SubprocessResponsePB resp;
      EXIT_NOT_OK(server->Execute(&req, &resp), i);
      EXIT_NOT_OK(CheckMessage(resp, msg), i);
    });
  }
  for (auto& t : threads) {
    t.join();
  }
  for (const auto& r : results) {
    ASSERT_OK(r);
  }
}

} // namespace subprocess
} // namespace kudu

