[BEAM-3713] Move dataflow:validatesContainerTests from nosetest to pytest (#14716)

diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
index e722bdc..bc43ecb 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
@@ -31,7 +31,7 @@
       commonJobProperties.setTopLevelMainJobProperties(delegate)
 
       publishers {
-        archiveJunit('**/nosetests*.xml')
+        archiveJunit('**/pytest*.xml')
       }
 
       // Execute shell command to test Python SDK.
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 24c455b..242dcff 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -24,6 +24,7 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
@@ -50,18 +51,19 @@
   def test_wordcount_it(self):
     self._run_wordcount_it(wordcount.run)
 
-  @attr('IT', 'ValidatesContainer')
+  @attr('IT')
+  @pytest.mark.it_validatescontainer
   def test_wordcount_fnapi_it(self):
     self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
 
-  @attr('ValidatesContainer')
+  @pytest.mark.it_validatescontainer
   def test_wordcount_it_with_prebuilt_sdk_container_local_docker(self):
     self._run_wordcount_it(
         wordcount.run,
         experiment='beam_fn_api',
         prebuild_sdk_container_engine='local_docker')
 
-  @attr('ValidatesContainer')
+  @pytest.mark.it_validatescontainer
   def test_wordcount_it_with_prebuilt_sdk_container_cloud_build(self):
     self._run_wordcount_it(
         wordcount.run,
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
index b51489a..0ee0e0f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
@@ -22,6 +22,7 @@
 import argparse
 import unittest
 
+import pytest
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
@@ -50,7 +51,8 @@
         dataflow_exercise_metrics_pipeline.metric_matchers())
     self.assertFalse(errors, str(errors))
 
-  @attr('IT', 'ValidatesContainer')
+  @attr('IT')
+  @pytest.mark.it_validatescontainer
   def test_metrics_fnapi_it(self):
     result = self.run_pipeline(experiment='beam_fn_api')
     errors = metric_result_matchers.verify_all(
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index a8ccf6a..2ba273e 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -59,6 +59,10 @@
       pcoll = ...
       assert_that(pcoll, equal_to(...))
   """
+  # Command line options read in by pytest.
+  # If this is not None, will use as default value for --test-pipeline-options.
+  pytest_test_pipeline_options = None
+
   def __init__(
       self,
       runner=None,
@@ -142,8 +146,9 @@
         default=False,
         help='whether not to use test-runner-api')
     known, unused_argv = parser.parse_known_args(argv)
-
-    if self.is_integration_test and not known.test_pipeline_options:
+    test_pipeline_options = known.test_pipeline_options or \
+                            TestPipeline.pytest_test_pipeline_options
+    if self.is_integration_test and not test_pipeline_options:
       # Skip integration test when argument '--test-pipeline-options' is not
       # specified since nose calls integration tests when runs unit test by
       # 'setup.py test'.
@@ -152,8 +157,8 @@
           'is not specified')
 
     self.not_use_test_runner_api = known.not_use_test_runner_api
-    return shlex.split(known.test_pipeline_options) \
-      if known.test_pipeline_options else []
+    return shlex.split(test_pipeline_options) \
+      if test_pipeline_options else []
 
   def get_full_options_as_args(self, **extra_opts):
     """Get full pipeline options as an argument list.
diff --git a/sdks/python/conftest.py b/sdks/python/conftest.py
index b0a35cf..3e2d5ca 100644
--- a/sdks/python/conftest.py
+++ b/sdks/python/conftest.py
@@ -21,6 +21,7 @@
 import sys
 
 from apache_beam.options import pipeline_options
+from apache_beam.testing.test_pipeline import TestPipeline
 
 MAX_SUPPORTED_PYTHON_VERSION = (3, 8)
 
@@ -40,5 +41,11 @@
 
 
 def pytest_configure(config):
+  """Saves options added in pytest_addoption for later use.
+  This is necessary since pytest-xdist workers do not have the same sys.argv as
+  the main pytest invocation. xdist does seem to pickle TestPipeline
+  """
+  TestPipeline.pytest_test_pipeline_options = config.getoption(
+        'test_pipeline_options', default='')
   # Enable optional type checks on all tests.
   pipeline_options.enable_all_additional_type_checks()
diff --git a/sdks/python/container/run_validatescontainer.sh b/sdks/python/container/run_validatescontainer.sh
index ad5ecb7..4eab5d9 100755
--- a/sdks/python/container/run_validatescontainer.sh
+++ b/sdks/python/container/run_validatescontainer.sh
@@ -71,7 +71,7 @@
   echo "Must set Python version with one of 'python36', 'python37' and 'python38' from commandline."
   exit 1
 fi
-XUNIT_FILE="nosetests-$IMAGE_NAME.xml"
+XUNIT_FILE="pytest-$IMAGE_NAME.xml"
 
 # Verify in the root of the repository
 test -d sdks/python/container
@@ -118,14 +118,14 @@
 
 # Run ValidatesRunner tests on Google Cloud Dataflow service
 echo ">>> RUNNING DATAFLOW RUNNER VALIDATESCONTAINER TEST"
-python setup.py nosetests \
-  --attr ValidatesContainer \
-  --nologcapture \
-  --processes=1 \
-  --process-timeout=900 \
-  --with-xunitmp \
-  --xunitmp-file=$XUNIT_FILE \
-  --ignore-files '.*py3\d?\.py$' \
+pytest -o junit_suite_name=$IMAGE_NAME \
+  -m="it_validatescontainer" \
+  --show-capture=no \
+  --numprocesses=1 \
+  --timeout=900 \
+  --junitxml=$XUNIT_FILE \
+  --ignore-glob '.*py3\d?\.py$' \
+  --log-cli-level=INFO \
   --test-pipeline-options=" \
     --runner=TestDataflowRunner \
     --project=$PROJECT \
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 00d8032..4837bca 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -24,9 +24,10 @@
 python_functions =
 # Discover tests using filenames.
 # See conftest.py for extra collection rules.
-python_files = test_*.py *_test.py *_test_py3*.py
+python_files = test_*.py *_test.py *_test_py3*.py *_test_it.py
 
 markers =
+    it_validatescontainer: collect for ValidatesContainer integration test runs
     # Tests using this marker conflict with the xdist plugin in some way, such
     # as enabling save_main_session.
     no_xdist: run without pytest-xdist plugin
diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh
index 519ee3d..c5baa89 100755
--- a/sdks/python/scripts/run_integration_test.sh
+++ b/sdks/python/scripts/run_integration_test.sh
@@ -78,6 +78,7 @@
 WORKER_JAR=""
 KMS_KEY_NAME="projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test"
 SUITE=""
+COLLECT_MARKERS=
 
 # Default test (nose) options.
 # Run WordCountIT.test_wordcount_it by default if no test options are
@@ -163,6 +164,16 @@
         shift # past argument
         shift # past value
         ;;
+    --pytest)
+      PYTEST="$2"
+      shift # past argument
+      shift # past value
+      ;;
+    --collect)
+      COLLECT_MARKERS="-m=$2"
+      shift # past argument
+      shift # past value
+      ;;
     *)    # unknown option
         echo "Unknown option: $1"
         exit 1
@@ -270,11 +281,23 @@
 # Run tests and validate that jobs finish successfully.
 
 echo ">>> RUNNING integration tests with pipeline options: $PIPELINE_OPTS"
-echo ">>>   test options: $TEST_OPTS"
-# TODO(BEAM-3713): Pass $SUITE once migrated to pytest. xunitmp doesn't support
-#   suite names.
-python setup.py nosetests \
-  --test-pipeline-options="$PIPELINE_OPTS" \
-  --with-xunitmp --xunitmp-file=$XUNIT_FILE \
-  --ignore-files '.*py3\d?\.py$' \
-  $TEST_OPTS
+if [[ "$PYTEST" = true ]]; then
+  echo ">>>   pytest options: $TEST_OPTS"
+  echo ">>>   collect markers: $COLLECT_MARKERS"
+  ARGS="-o junit_suite_name=$SUITE --junitxml=pytest_$SUITE.xml $TEST_OPTS"
+  # Handle markers as an independient argument from $TEST_OPTS to prevent errors in space separeted flags
+  if [ -z "$COLLECT_MARKERS" ]; then
+    pytest $ARGS --test-pipeline-options="$PIPELINE_OPTS"
+  else
+    pytest $ARGS --test-pipeline-options="$PIPELINE_OPTS" "$COLLECT_MARKERS"
+  fi
+else
+  echo ">>>   test options: $TEST_OPTS"
+  # TODO(BEAM-3713): Pass $SUITE once migrated to pytest. xunitmp doesn't
+  #   support suite names.
+  python setup.py nosetests \
+    --test-pipeline-options="$PIPELINE_OPTS" \
+    --with-xunitmp --xunitmp-file=$XUNIT_FILE \
+    --ignore-files '.*py3\d?\.py$' \
+    $TEST_OPTS
+fi
\ No newline at end of file