// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// boot is the boot code for the Python SDK harness container. It is responsible
// for retrieving and install staged files and invoking python correctly.
package main

import (
	"context"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"regexp"
	"slices"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/apache/beam/sdks/v2/go/container/tools"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

var (
	acceptableWhlSpecs []string

	// SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images.
	setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).")
	artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).")

	// Contract: https://s.apache.org/beam-fn-api-container-contract.

	workerPool        = flag.Bool("worker_pool", false, "Run as worker pool (optional).")
	id                = flag.String("id", "", "Local identifier (required).")
	loggingEndpoint   = flag.String("logging_endpoint", "", "Logging endpoint (required).")
	artifactEndpoint  = flag.String("artifact_endpoint", "", "Artifact endpoint (required).")
	provisionEndpoint = flag.String("provision_endpoint", "", "Provision endpoint (required).")
	controlEndpoint   = flag.String("control_endpoint", "", "Control endpoint (required).")
	semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
)

const (
	sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main"
	// Please keep these names in sync with stager.py
	workflowFile      = "workflow.tar.gz"
	requirementsFile  = "requirements.txt"
	sdkSrcFile        = "dataflow_python_sdk.tar"
	extraPackagesFile = "extra_packages.txt"
	workerPoolIdEnv   = "BEAM_PYTHON_WORKER_POOL_ID"

	standardArtifactFileTypeUrn = "beam:artifact:type:file:v1"
)

func main() {
	flag.Parse()

	if *setupOnly {
		processArtifactsInSetupOnlyMode()
		os.Exit(0)
	}

	if *workerPool {
		workerPoolId := fmt.Sprintf("%d", os.Getpid())
		os.Setenv(workerPoolIdEnv, workerPoolId)
		args := []string{
			"-m",
			"apache_beam.runners.worker.worker_pool_main",
			"--service_port=50000",
			"--container_executable=/opt/apache/beam/boot",
		}
		log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
		pythonVersion, err := expansionx.GetPythonVersion()
		if err != nil {
			log.Fatalf("Python SDK worker pool exited with error: %v", err)
		}
		if err := execx.Execute(pythonVersion, args...); err != nil {
			log.Fatalf("Python SDK worker pool exited with error: %v", err)
		}
		log.Print("Python SDK worker pool exited.")
		os.Exit(0)
	}

	if *id == "" {
		log.Fatalf("No id provided.")
	}
	if *provisionEndpoint == "" {
		log.Fatalf("No provision endpoint provided.")
	}

	if err := launchSDKProcess(); err != nil {
		log.Fatal(err)
	}
}

// The json string of pipeline options is in the following format.
// We only focus on experiments here.
//
//	{
//		 "display_data": [
//		  	{...},
//		 ],
//		 "options": {
//		  	...
//			  "experiments": [
//				...
//			 ],
//		 }
//	}
type PipelineOptionsData struct {
	Options OptionsData `json:"options"`
}

type OptionsData struct {
	Experiments []string `json:"experiments"`
}

func getExperiments(options string) []string {
	var opts PipelineOptionsData
	err := json.Unmarshal([]byte(options), &opts)
	if err != nil {
		return nil
	}
	return opts.Options.Experiments
}

func launchSDKProcess() error {
	ctx := grpcx.WriteWorkerID(context.Background(), *id)

	info, err := tools.ProvisionInfo(ctx, *provisionEndpoint)
	if err != nil {
		log.Fatalf("Failed to obtain provisioning information: %v", err)
	}
	log.Printf("Provision info:\n%v", info)

	// TODO(BEAM-8201): Simplify once flags are no longer used.
	if info.GetLoggingEndpoint().GetUrl() != "" {
		*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
	}
	if info.GetArtifactEndpoint().GetUrl() != "" {
		*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
	}
	if info.GetControlEndpoint().GetUrl() != "" {
		*controlEndpoint = info.GetControlEndpoint().GetUrl()
	}

	if *loggingEndpoint == "" {
		log.Fatalf("No logging endpoint provided.")
	}
	if *artifactEndpoint == "" {
		log.Fatalf("No artifact endpoint provided.")
	}
	if *controlEndpoint == "" {
		log.Fatalf("No control endpoint provided.")
	}
	logger := &tools.Logger{Endpoint: *loggingEndpoint}
	logger.Printf(ctx, "Initializing python harness: %v", strings.Join(os.Args, " "))

	// (1) Obtain the pipeline options

	options, err := tools.ProtoToJSON(info.GetPipelineOptions())
	if err != nil {
		logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
	}

	experiments := getExperiments(options)
	pipNoBuildIsolation = false
	if slices.Contains(experiments, "pip_no_build_isolation") {
		pipNoBuildIsolation = true
		logger.Printf(ctx, "Disabled build isolation when installing packages with pip")
	}

	// (2) Retrieve and install the staged packages.
	//
	// No log.Fatalf() from here on, otherwise deferred cleanups will not be called!

	// Trap signals, so we can clean up properly.
	signalChannel := make(chan os.Signal, 1)
	signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

	// Create a separate virtual environment (with access to globally installed packages), unless disabled by the user.
	// This improves usability on runners that persist the execution environment for the boot entrypoint between multiple pipeline executions.
	if os.Getenv("RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT") == "" {
		venvDir, err := setupVenv(ctx, logger, "/opt/apache/beam-venv", *id)
		if err != nil {
			return errors.New(
				"failed to create a virtual environment. If running on Ubuntu systems, " +
					"you might need to install `python3-venv` package. " +
					"To run the SDK process in default environment instead, " +
					"set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " +
					"In custom Docker images, you can do that with an `ENV` statement. " +
					fmt.Sprintf("Encountered error: %v", err))
		}
		cleanupFunc := func() {
			os.RemoveAll(venvDir)
			logger.Printf(ctx, "Cleaned up temporary venv for worker %v.", *id)
		}
		defer cleanupFunc()
	}

	dir := filepath.Join(*semiPersistDir, "staged")
	files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
	if err != nil {
		fmtErr := fmt.Errorf("failed to retrieve staged files: %v", err)
		// Send error message to logging service before returning up the call stack
		logger.Errorf(ctx, "%s", fmtErr.Error())
		// No need to fail the job if submission_environment_dependencies.txt cannot be loaded
		if strings.Contains(fmtErr.Error(), "submission_environment_dependencies.txt") {
			logger.Printf(ctx, "Ignore the error when loading submission_environment_dependencies.txt.")
		} else {
			return fmtErr
		}
	}

	// TODO(herohde): the packages to install should be specified explicitly. It
	// would also be possible to install the SDK in the Dockerfile.
	fileNames := make([]string, len(files))
	requirementsFiles := []string{requirementsFile}
	for i, v := range files {
		name, _ := artifact.MustExtractFilePayload(v)
		logger.Printf(ctx, "Found artifact: %s", name)
		fileNames[i] = name

		if v.RoleUrn == artifact.URNPipRequirementsFile {
			requirementsFiles = append(requirementsFiles, name)
		}
	}

	if setupErr := installSetupPackages(ctx, logger, fileNames, dir, requirementsFiles); setupErr != nil {
		fmtErr := fmt.Errorf("failed to install required packages: %v", setupErr)
		// Send error message to logging service before returning up the call stack
		logger.Errorf(ctx, "%s", fmtErr.Error())
		return fmtErr
	}

	// (3) Invoke python

	os.Setenv("PIPELINE_OPTIONS", options)
	os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
	os.Setenv("RUNNER_CAPABILITIES", strings.Join(info.GetRunnerCapabilities(), " "))

	if info.GetStatusEndpoint() != nil {
		os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", info.GetStatusEndpoint().String())
	}

	if metadata := info.GetMetadata(); metadata != nil {
		if jobName, nameExists := metadata["job_name"]; nameExists {
			os.Setenv("JOB_NAME", jobName)
		}
		if jobID, idExists := metadata["job_id"]; idExists {
			os.Setenv("JOB_ID", jobID)
		}
	}

	workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)

	// Keep track of child PIDs for clean shutdown without zombies
	childPids := struct {
		v        []int
		canceled bool
		mu       sync.Mutex
	}{v: make([]int, 0, len(workerIds))}

	// Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
	go func() {
		logger.Printf(ctx, "Received signal: %v", <-signalChannel)
		childPids.mu.Lock()
		childPids.canceled = true
		for _, pid := range childPids.v {
			go func(pid int) {
				// This goroutine will be canceled if the main process exits before the 5 seconds
				// have elapsed, i.e., as soon as all subprocesses have returned from Wait().
				time.Sleep(5 * time.Second)
				if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil {
					logger.Warnf(ctx, "Worker process %v did not respond, killed it.", pid)
				}
			}(pid)
			syscall.Kill(-pid, syscall.SIGTERM)
		}
		childPids.mu.Unlock()
	}()

	args := []string{
		"-m",
		sdkHarnessEntrypoint,
	}

	var wg sync.WaitGroup
	wg.Add(len(workerIds))
	for _, workerId := range workerIds {
		go func(workerId string) {
			defer wg.Done()

			bufLogger := tools.NewBufferedLogger(logger)
			errorCount := 0
			for {
				childPids.mu.Lock()
				if childPids.canceled {
					childPids.mu.Unlock()
					return
				}
				logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
				cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...)
				childPids.v = append(childPids.v, cmd.Process.Pid)
				childPids.mu.Unlock()

				if err := cmd.Wait(); err != nil {
					// Retry on fatal errors, like OOMs and segfaults, not just
					// DoFns throwing exceptions.
					errorCount += 1
					bufLogger.FlushAtError(ctx)
					if errorCount < 4 {
						logger.Warnf(ctx, "Python (worker %v) exited %v times: %v\nrestarting SDK process",
							workerId, errorCount, err)
					} else {
						logger.Fatalf(ctx, "Python (worker %v) exited %v times: %v\nout of retries, failing container",
							workerId, errorCount, err)
					}
				} else {
					bufLogger.FlushAtDebug(ctx)
					logger.Printf(ctx, "Python (worker %v) exited.", workerId)
					break
				}
			}
		}(workerId)
	}
	wg.Wait()
	return nil
}

// Start a command object in a new process group with the given arguments with
// additional environment variables. It attaches stdio to the child process.
// Returns the process handle.
func StartCommandEnv(env map[string]string, stdin io.Reader, stdout, stderr io.Writer, prog string, args ...string) *exec.Cmd {
	cmd := exec.Command(prog, args...)
	cmd.Stdin = stdin
	cmd.Stdout = stdout
	cmd.Stderr = stderr
	if env != nil {
		cmd.Env = os.Environ()
		for k, v := range env {
			cmd.Env = append(cmd.Env, k+"="+v)
		}
	}

	// Create process group so we can clean up the whole subtree later without creating zombies
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
	cmd.Start()
	return cmd
}

// setupVenv initializes a local Python venv and sets the corresponding env variables
func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId string) (string, error) {
	dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
	logger.Printf(ctx, "Initializing temporary Python venv in %v", dir)
	if _, err := os.Stat(dir); !os.IsNotExist(err) {
		// Probably leftovers from a previous run
		logger.Printf(ctx, "Cleaning up previous venv ...")
		if err := os.RemoveAll(dir); err != nil {
			return "", err
		}
	}
	if err := os.MkdirAll(dir, 0750); err != nil {
		return "", fmt.Errorf("failed to create Python venv directory: %s", err)
	}
	pythonVersion, err := expansionx.GetPythonVersion()
	if err != nil {
		return "", err
	}
	if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil {
		return "", fmt.Errorf("python venv initialization failed: %s", err)
	}

	os.Setenv("VIRTUAL_ENV", dir)
	os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
	return dir, nil
}

// setupAcceptableWheelSpecs setup wheel specs according to installed python version
func setupAcceptableWheelSpecs() error {
	cmd := exec.Command("python", "-V")
	stdoutStderr, err := cmd.CombinedOutput()
	if err != nil {
		return err
	}
	re := regexp.MustCompile(`Python (\d)\.(\d+).*`)
	pyVersions := re.FindStringSubmatch(string(stdoutStderr[:]))
	if len(pyVersions) != 3 {
		return fmt.Errorf("cannot get parse Python version from %s", stdoutStderr)
	}
	pyVersion := fmt.Sprintf("%s%s", pyVersions[1], pyVersions[2])
	wheelName := fmt.Sprintf("cp%s-cp%s-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", pyVersion, pyVersion)
	acceptableWhlSpecs = append(acceptableWhlSpecs, wheelName)
	return nil
}

// installSetupPackages installs Beam SDK and user dependencies.
func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error {
	bufLogger := tools.NewBufferedLogger(logger)
	bufLogger.Printf(ctx, "Installing setup packages ...")

	if err := setupAcceptableWheelSpecs(); err != nil {
		bufLogger.Printf(ctx, "Failed to setup acceptable wheel specs, leave it as empty: %v", err)
	}

	// Install the Dataflow Python SDK if one was staged. In released
	// container images, SDK is already installed, but can be overriden
	// using the --sdk_location pipeline option.
	if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
		return fmt.Errorf("failed to install SDK: %v", err)
	}
	pkgName := "apache-beam"
	isSdkInstalled := isPackageInstalled(pkgName)
	if !isSdkInstalled {
		return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/")
	}
	// The staged files will not disappear due to restarts because workDir is a
	// folder that is mapped to the host (and therefore survives restarts).
	for _, f := range requirementsFiles {
		if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil {
			return fmt.Errorf("failed to install requirements: %v", err)
		}
	}
	if err := installExtraPackages(ctx, logger, files, extraPackagesFile, workDir); err != nil {
		return fmt.Errorf("failed to install extra packages: %v", err)
	}
	if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil {
		return fmt.Errorf("failed to install workflow: %v", err)
	}
	if err := logRuntimeDependencies(ctx, bufLogger); err != nil {
		bufLogger.Printf(ctx, "couldn't fetch the runtime python dependencies: %v", err)
	}
	if err := logSubmissionEnvDependencies(ctx, bufLogger, workDir); err != nil {
		bufLogger.Printf(ctx, "couldn't fetch the submission environment dependencies: %v", err)
	}

	return nil
}

// processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
// when flag --setup_only and --artifacts exist. The setup mode will only
// process the provided artifacts and skip the actual worker program start up.
// The mode is useful for building new images with dependencies pre-installed so
// that the installation can be skipped at the pipeline runtime.
func processArtifactsInSetupOnlyMode() {
	if *artifacts == "" {
		log.Fatal("No --artifacts provided along with --setup_only flag.")
	}
	workDir := filepath.Dir(*artifacts)
	metadata, err := os.ReadFile(*artifacts)
	if err != nil {
		log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err)
	}
	var infoJsons []string
	if err := json.Unmarshal(metadata, &infoJsons); err != nil {
		log.Fatalf("Unable to parse metadata, error: %v", err)
	}

	files := make([]string, len(infoJsons))
	for i, info := range infoJsons {
		var artifactInformation pipepb.ArtifactInformation
		if err := protojson.Unmarshal([]byte(info), &artifactInformation); err != nil {
			log.Fatalf("Unable to unmarshal artifact information from json string %v", info)
		}

		// For now we only expect artifacts in file type. The condition should be revisited if the assumption is not valid any more.
		if artifactInformation.GetTypeUrn() != standardArtifactFileTypeUrn {
			log.Fatalf("Expect file artifact type in setup only mode, found %v.", artifactInformation.GetTypeUrn())
		}
		filePayload := &pipepb.ArtifactFilePayload{}
		if err := proto.Unmarshal(artifactInformation.GetTypePayload(), filePayload); err != nil {
			log.Fatal("Unable to unmarshal artifact information type payload.")
		}
		files[i] = filePayload.GetPath()
	}
	if setupErr := installSetupPackages(context.Background(), nil, files, workDir, []string{requirementsFile}); setupErr != nil {
		log.Fatalf("Failed to install required packages: %v", setupErr)
	}
}

// logRuntimeDependencies logs the python dependencies
// installed in the runtime environment.
func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger) error {
	pythonVersion, err := expansionx.GetPythonVersion()
	if err != nil {
		return err
	}
	bufLogger.Printf(ctx, "Using Python version:")
	args := []string{"--version"}
	if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil {
		bufLogger.FlushAtError(ctx)
	} else {
		bufLogger.FlushAtDebug(ctx)
	}
	bufLogger.Printf(ctx, "Logging runtime dependencies:")
	args = []string{"-m", "pip", "freeze"}
	if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil {
		bufLogger.FlushAtError(ctx)
	} else {
		bufLogger.FlushAtDebug(ctx)
	}
	return nil
}

// logSubmissionEnvDependencies logs the python dependencies
// installed in the submission environment.
func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.BufferedLogger, dir string) error {
	bufLogger.Printf(ctx, "Logging submission environment dependencies:")
	// path for submission environment dependencies should match with the
	// one defined in apache_beam/runners/portability/stager.py.
	filename := filepath.Join(dir, "submission_environment_dependencies.txt")
	content, err := os.ReadFile(filename)
	if err != nil {
		return err
	}
	bufLogger.Printf(ctx, "%s", string(content))
	return nil
}
