import groovy.json.JsonOutput

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

plugins { id 'org.apache.beam.module' }
applyJavaNature( automaticModuleName: 'org.apache.beam.runners.portability')
applyPythonNature()

description = "Apache Beam :: Runners :: Portability :: Java"
ext.summary = """A Java implementation of the Beam Model which utilizes the portability
framework to execute user-definied functions."""

configurations {
  validatesRunner
}

dependencies {
  implementation project(path: ":model:job-management", configuration: "shadow")
  implementation project(path: ":model:pipeline", configuration: "shadow")
  implementation project(":runners:java-fn-execution")
  implementation project(":runners:java-job-service")
  implementation project(path: ":sdks:java:core", configuration: "shadow")
  implementation project(path: ":sdks:java:harness", configuration: "shadow")
  implementation library.java.hamcrest
  permitUnusedDeclared library.java.hamcrest
  implementation library.java.joda_time
  implementation library.java.slf4j_api
  implementation library.java.vendored_grpc_1_69_0
  implementation library.java.vendored_guava_32_1_2_jre

  testImplementation library.java.hamcrest
  testImplementation library.java.junit
  testImplementation library.java.mockito_core
  testImplementation library.java.slf4j_jdk14

  validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
  validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
  validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
}

// Set using -PjobEndpoint=, for example: -PjobEndpoint=localhost:8073
def jobEndpointPropertyName = "jobEndpoint"

project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":sdks:python")
project.evaluationDependsOn(":runners:core-java")

ext.pythonSdkDir = project.findProject(":sdks:python").getProjectDir().getAbsoluteFile().toString()

// If this is set via -P then we assume it is already running and will not start it
def needsLocalJobService = !project.hasProperty("localJobServicePortFile") && !project.hasProperty(jobEndpointPropertyName)

ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"

ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"
ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"

void execInVirtualenv(String... args) {
  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
  exec {
    workingDir project.ext.pythonSdkDir
    commandLine "sh", "-c", shellCommand
  }
}

// Does not background the process, but allows the process to daemonize itself
void execBackgroundInVirtualenv(String... args) {
  String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
  ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(project.ext.pythonSdkDir)).command(["sh", "-c", shellCommand])
  Process proc = pb.start();

  // redirectIO does not work for connecting to groovy/gradle stdout
  BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
  String line
  while ((line = reader.readLine()) != null) {
    println line
  }
  int exitCode = proc.waitFor();
  if (exitCode != 0) {
    throw new RuntimeException("Local job service startup failed with exit code ${exitCode}")
  }
}

def installBeamPythonInVirtualenv = tasks.register("installBeamPythonInVirtualenv") {
  dependsOn setupVirtualenv
  doLast {
    execInVirtualenv "pip", "install", "-e", "."
  }
}

def startLocalJobService = tasks.register("startLocalJobService") {
  dependsOn installBeamPythonInVirtualenv

  doLast {
    execBackgroundInVirtualenv "python",
        "-m", "apache_beam.runners.portability.local_job_service_main",
        "--background",
        "--stdout_file=${localJobServiceStdoutFile}",
        "--pid_file=${localJobServicePidFile}",
        "--port_file=${localJobServicePortFile}"
  }
}

task stopLocalJobService {
  doLast {
    execInVirtualenv "python",
        "-m", "apache_beam.runners.portability.local_job_service_main",
        "--stop",
        "--pid_file=${localJobServicePidFile}"
  }
}

startLocalJobService.configure{finalizedBy stopLocalJobService}

def sickbayTests = [

        //TODO: https://github.com/apache/beam/issues/20631
        'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation',
]

/**
 * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
 * with the specified environment type.
 */
def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = "" ->
  Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
    description "PortableRunner Java docker ValidatesRunner suite"
    classpath = configurations.validatesRunner
    var jobEndpointOption = "--localJobServicePortFile=$localJobServicePortFile"
    if (project.hasProperty(jobEndpointPropertyName)) {
      jobEndpointOption = "--$jobEndpointPropertyName=${project.findProperty(jobEndpointPropertyName)}"
    }
    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
        "--runner=TestUniversalRunner",
        "--experiments=beam_fn_api",
        "--defaultEnvironmentType=${environmentType}",
        jobEndpointOption
    ])
    testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
    useJUnit {
      includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
      // Should be run only in a properly configured SDK harness environment
      excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics'
      excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
      excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
      excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
      excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics'
      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
      excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
      excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
      excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
      excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
      excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
      excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
    }
    filter {
      // There is not currently a category for excluding these _only_ in committed mode
      // https://github.com/apache/beam/issues/20370
      excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics'
      // https://github.com/apache/beam/issues/20371
      excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics'

      // Teardown not called in exceptions
      // https://github.com/apache/beam/issues/20372
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful'

      // Only known window fns supported, not general window merging
      // https://github.com/apache/beam/issues/20437
      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows'

      // Misc failures
      // https://github.com/apache/beam/issues/20375
      excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine'
      excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext'

      // https://github.com/apache/beam/issues/20373
      excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo'

      // https://github.com/apache/beam/issues/20374
      excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'

      // TODO(https://github.com/apache/beam/issues/29973)
      excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
      // TODO(https://github.com/apache/beam/issues/31231)
      excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'

      for (String test : sickbayTests) {
        excludeTestsMatching test
      }

    }
  }

  if (environmentType == "DOCKER") {
    if(dockerImageTask.isEmpty()) {
      throw new StopExecutionException("dockerImageTask is required for a Docker environment test task");
    }
    vrTask.dependsOn dockerImageTask
  }

  if (needsLocalJobService) {
    vrTask.dependsOn startLocalJobService
    stopLocalJobService.mustRunAfter vrTask
  }

  return vrTask
}

tasks.register("validatesRunnerSickbay", Test) {
  group = "Verification"
  description "Validates Universal local runner (Sickbay Tests)"
  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
          "--runner=TestUniversalRunner",
  ])

  classpath = configurations.validatesRunner
  testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)

  filter {
    for (String test : sickbayTests) {
      includeTestsMatching test
    }
  }
}

task ulrDockerValidatesRunner {
  dependsOn createUlrValidatesRunnerTask("ulrDockerValidatesRunnerTests", "DOCKER", ":sdks:java:container:${project.ext.currentJavaVersion}:docker")
}

task ulrLoopbackValidatesRunner {
  dependsOn createUlrValidatesRunnerTask("ulrLoopbackValidatesRunnerTests", "LOOPBACK")
}

