[BEAM-3713] Move PostCommit_Python from nosetest to pytest (#14859)

diff --git a/.test-infra/jenkins/job_PostCommit_Python.groovy b/.test-infra/jenkins/job_PostCommit_Python.groovy
index f0e4e58..2f3b1d8 100644
--- a/.test-infra/jenkins/job_PostCommit_Python.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python.groovy
@@ -33,7 +33,7 @@
         commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 120)
 
         publishers {
-          archiveJunit('**/nosetests*.xml')
+          archiveJunit('**/pytest*.xml')
         }
 
         // Execute shell command to test Python SDK.
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 4d1d892..2048742 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,7 +21,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
@@ -55,7 +55,7 @@
               ('that', ((1, 'that'), )),
           ]))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_autocomplete_it(self):
     with TestPipeline(is_integration_test=True) as p:
       words = p | beam.io.ReadFromText(self.KINGLEAR_INPUT)
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 91388a4..ea7ec28 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/game_stats.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+  pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -38,8 +38,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import game_stats
 from apache_beam.io.gcp.tests import utils
@@ -103,7 +103,7 @@
     test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
     test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_game_stats_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
 
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
index f21abac..a9729bc 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+  pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -36,8 +36,8 @@
 import logging
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import hourly_team_score
 from apache_beam.io.gcp.tests import utils
@@ -63,7 +63,7 @@
     self.dataset_ref = utils.create_bq_dataset(
         self.project, self.OUTPUT_DATASET)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_hourly_team_score_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.DONE)
     query = (
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index 8f5f91c..8b82c64 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+    pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -38,8 +38,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import leader_board
 from apache_beam.io.gcp.tests import utils
@@ -104,7 +104,7 @@
     test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
     test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_leader_board_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
 
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
index a2b3a17..d26565e 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/user_score.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+    pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -37,8 +37,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import user_score
 from apache_beam.runners.runner import PipelineState
@@ -60,7 +60,7 @@
     self.output = '/'.join(
         [self.test_pipeline.get_option('output'), self.uuid, 'results'])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_user_score_it(self):
 
     state_verifier = PipelineStateMatcher(PipelineState.DONE)
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
index 9b6097b..a2a3262 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
@@ -24,8 +24,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.juliaset.juliaset import juliaset
 from apache_beam.io.filesystems import FileSystems
@@ -34,7 +34,7 @@
 from apache_beam.testing.test_pipeline import TestPipeline
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 class JuliaSetTestIT(unittest.TestCase):
   GRID_SIZE = 1000
 
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index cfec86b..fa5f12c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -23,8 +23,8 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.cookbook import bigquery_tornadoes
 from apache_beam.io.gcp.tests import utils
@@ -42,7 +42,7 @@
   # from expected Bigquery table.
   DEFAULT_CHECKSUM = 'd860e636050c559a16a791aff40d6ad809d4daf0'
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bigquery_tornadoes_it(self):
     test_pipeline = TestPipeline(is_integration_test=True)
 
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
index 36f81bd..388caf6 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
@@ -23,8 +23,8 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -42,7 +42,7 @@
   DATASTORE_WORDCOUNT_KIND = "DatastoreWordCount"
   EXPECTED_CHECKSUM = '826f69ed0275858c2e098f1e8407d4e3ba5a4b3f'
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_datastore_wordcount_it(self):
     test_pipeline = TestPipeline(is_integration_test=True)
     kind = self.DATASTORE_WORDCOUNT_KIND
diff --git a/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py b/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
index e1a0ac4..6ed376e 100644
--- a/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
+++ b/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
@@ -28,7 +28,7 @@
 import uuid
 
 import pandas as pd
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam.examples.dataframe import flight_delays
 from apache_beam.io.filesystems import FileSystems
@@ -100,7 +100,7 @@
   def tearDown(self):
     FileSystems.delete([self.outdir + '/'])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_flight_delays(self):
     flight_delays.run_flight_delay_pipeline(
         self.test_pipeline,
diff --git a/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py b/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
index 4fedfa8..f81b7d8 100644
--- a/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
+++ b/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
@@ -25,7 +25,7 @@
 import uuid
 
 import pandas as pd
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam.examples.dataframe import taxiride
 from apache_beam.io.filesystems import FileSystems
@@ -44,7 +44,7 @@
   def tearDown(self):
     FileSystems.delete([self.outdir + '/'])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_aggregation(self):
     taxiride.run_aggregation_pipeline(
         self.test_pipeline,
@@ -71,7 +71,7 @@
 
     pd.testing.assert_frame_equal(expected, result)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_enrich(self):
     # Standard workers OOM with the enrich pipeline
     self.test_pipeline.get_pipeline_options().view_as(
diff --git a/sdks/python/apache_beam/examples/fastavro_it_test.py b/sdks/python/apache_beam/examples/fastavro_it_test.py
index 9ac8051..c9bb988 100644
--- a/sdks/python/apache_beam/examples/fastavro_it_test.py
+++ b/sdks/python/apache_beam/examples/fastavro_it_test.py
@@ -24,7 +24,7 @@
 Usage:
 
   DataFlowRunner:
-    python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+    pytest apache_beam/examples/fastavro_it_test.py \
         --test-pipeline-options="
           --runner=TestDataflowRunner
           --project=...
@@ -36,7 +36,7 @@
         "
 
   DirectRunner:
-    python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+    pytest apache_beam/examples/fastavro_it_test.py \
       --test-pipeline-options="
         --output=/tmp
         --records=5000
@@ -50,9 +50,9 @@
 import unittest
 import uuid
 
+import pytest
 from avro.schema import Parse
 from fastavro import parse_schema
-from nose.plugins.attrib import attr
 
 from apache_beam.io.avroio import ReadAllFromAvro
 from apache_beam.io.avroio import WriteToAvro
@@ -98,7 +98,7 @@
     self.uuid = str(uuid.uuid4())
     self.output = '/'.join([self.test_pipeline.get_option('output'), self.uuid])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_avro_it(self):
     num_records = self.test_pipeline.get_option('records')
     num_records = int(num_records) if num_records else 1000000
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
index 367fc82..9101ff8 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
@@ -23,8 +23,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples import streaming_wordcount_debugging
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
@@ -94,7 +94,7 @@
     test_utils.cleanup_topics(
         self.pub_client, [self.input_topic, self.output_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skip(
       "Skipped due to [BEAM-3377]: assert_that not working for streaming")
   def test_streaming_wordcount_debugging_it(self):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 7812283..8beae5e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -23,8 +23,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples import streaming_wordcount
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
@@ -77,7 +77,7 @@
     test_utils.cleanup_topics(
         self.pub_client, [self.input_topic, self.output_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_wordcount_it(self):
     # Build expected dataset.
     expected_msg = [('%d: 1' % num).encode('utf-8')
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 242dcff..8ee49c7 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -26,7 +26,6 @@
 
 import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples import wordcount
 from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
@@ -39,19 +38,16 @@
 
 class WordCountIT(unittest.TestCase):
 
-  # Enable nose tests running in parallel
-  _multiprocess_can_split_ = True
-
   # The default checksum is a SHA-1 hash generated from a sorted list of
   # lines read from expected output. This value corresponds to the default
   # input of WordCount example.
   DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_wordcount_it(self):
     self._run_wordcount_it(wordcount.run)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @pytest.mark.it_validatescontainer
   def test_wordcount_fnapi_it(self):
     self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index a39584a..90086c9 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -28,8 +28,8 @@
 import uuid
 import warnings
 
+import pytest
 from hamcrest.library.text import stringmatches
-from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.io import fileio
@@ -291,7 +291,7 @@
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_transform_on_gcs(self):
     args = self.test_pipeline.get_full_options_as_args()
 
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 0a30462..699dfa4 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -28,8 +28,8 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.io.gcp import big_query_query_to_table_pipeline
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -154,7 +154,7 @@
         self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
     self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_legacy_sql(self):
     verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
     expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
@@ -177,7 +177,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_standard_sql(self):
     verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
     expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
@@ -200,7 +200,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_standard_sql_kms_key_native(self):
     if isinstance(self.test_pipeline.runner, TestDirectRunner):
       self.skipTest("This test doesn't work on DirectRunner.")
@@ -236,7 +236,7 @@
         'No encryption configuration found: %s' % table)
     self.assertEqual(kms_key, table.encryptionConfiguration.kmsKeyName)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_new_types(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
@@ -260,7 +260,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_new_types_avro(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
@@ -283,7 +283,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_new_types_native(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 9672431..1e227f8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -26,10 +26,10 @@
 import unittest
 
 import mock
+import pytest
 from hamcrest.core import assert_that as hamcrest_assert
 from hamcrest.core.core.allof import all_of
 from hamcrest.core.core.is_ import is_
-from nose.plugins.attrib import attr
 from parameterized import param
 from parameterized import parameterized
 
@@ -744,7 +744,7 @@
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_destinations_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
     output_table_2 = '%s%s' % (self.output_table, 2)
@@ -824,7 +824,7 @@
               max_file_size=20,
               max_files_per_bundle=-1))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bqfl_streaming(self):
     if isinstance(self.test_pipeline.runner, TestDataflowRunner):
       self.skipTest("TestStream is not supported on TestDataflowRunner")
@@ -862,7 +862,7 @@
                                         .Method.FILE_LOADS,
                                       triggering_frequency=100))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_one_job_fails_all_jobs_fail(self):
 
     # If one of the import jobs fails, then other jobs must not be performed.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
index 6652b4e..63cc445 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
@@ -25,8 +25,8 @@
 import logging
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.io.gcp import bigquery_io_read_pipeline
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -59,11 +59,11 @@
     bigquery_io_read_pipeline.run(
         test_pipeline.get_full_options_as_args(**extra_opts))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bigquery_read_custom_1M_python(self):
     self.run_bigquery_io_read_pipeline('1M', True)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bigquery_read_1M_python(self):
     self.run_bigquery_io_read_pipeline('1M')
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 4040a60..472b521 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -29,7 +29,7 @@
 from decimal import Decimal
 from functools import wraps
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -157,7 +157,7 @@
         cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
 
   @skip(['PortableRunner', 'FlinkRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_native_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
@@ -165,7 +165,7 @@
               beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
       assert_that(result, equal_to(self.TABLE_DATA))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_iobase_source(self):
     query = StaticValueProvider(str, self.query)
     with beam.Pipeline(argv=self.args) as p:
@@ -272,7 +272,7 @@
     return expected_data
 
   @skip(['PortableRunner', 'FlinkRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_native_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
@@ -281,7 +281,7 @@
               beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
       assert_that(result, equal_to(self.get_expected_data()))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_iobase_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
@@ -378,7 +378,7 @@
     return table_schema
 
   @skip(['PortableRunner', 'FlinkRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_read_queries(self):
     # TODO(BEAM-11311): Remove experiment when tests run on r_v2.
     args = self.args + ["--experiments=use_runner_v2"]
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 19550b5..ce5874c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -32,8 +32,8 @@
 
 import hamcrest as hc
 import mock
+import pytest
 import pytz
-from nose.plugins.attrib import attr
 from parameterized import param
 from parameterized import parameterized
 
@@ -1047,13 +1047,6 @@
 class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
 
-  # Prevent nose from finding and running tests that were not
-  # specified in the Gradle file.
-  # See "More tests may be found" in:
-  # https://nose.readthedocs.io/en/latest/doc_tests/test_multiprocess
-  # /multiprocess.html#other-differences-in-test-running
-  _multiprocess_can_split_ = True
-
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
     self.runner_name = type(self.test_pipeline.runner).__name__
@@ -1069,7 +1062,7 @@
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_value_provider_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
     output_table_2 = '%s%s' % (self.output_table, 2)
@@ -1139,7 +1132,7 @@
               additional_bq_parameters=lambda _: additional_bq_parameters,
               method='FILE_LOADS'))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_destinations_transform(self):
     streaming = self.test_pipeline.options.view_as(StandardOptions).streaming
     if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner):
@@ -1333,11 +1326,11 @@
           method=method,
           triggering_frequency=triggering_frequency)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_inserts(self):
     self._run_pubsub_bq_pipeline(WriteToBigQuery.Method.STREAMING_INSERTS)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_file_loads(self):
     self._run_pubsub_bq_pipeline(
         WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
@@ -1362,7 +1355,7 @@
     _LOGGER.info(
         'Created dataset %s in project %s', self.dataset_id, self.project)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_avro_file_load(self):
     # Construct elements such that they can be written via Avro but not via
     # JSON. See BEAM-8841.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c8285a1..c7f1d442 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -30,8 +30,8 @@
 
 import hamcrest as hc
 import mock
+import pytest
 import pytz
-from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -105,7 +105,7 @@
         projectId=self.project, datasetId=self.dataset_id, table=table)
     self.bigquery_client.client.tables.Insert(request)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write(self):
     table_name = 'python_write_table'
     table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -164,7 +164,7 @@
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write_schema_autodetect(self):
     if self.runner_name == 'TestDataflowRunner':
       self.skipTest('DataflowRunner does not support schema autodetection')
@@ -209,7 +209,7 @@
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
               temp_file_format=FileFormat.JSON))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write_new_types(self):
     table_name = 'python_new_types_table'
     table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -290,7 +290,7 @@
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write_without_schema(self):
     table_name = 'python_no_schema_table'
     self.create_table(table_name)
@@ -352,7 +352,7 @@
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
               temp_file_format=FileFormat.JSON))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @mock.patch(
       "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1)
   def test_big_query_write_temp_table_append_schema_update(self):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
index f5b68a6..abecd5b 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
@@ -32,8 +32,8 @@
 import unittest
 from datetime import datetime
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,7 +66,7 @@
     datastore_write_it_pipeline.run(
         test_pipeline.get_full_options_as_args(**extra_opts))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skipIf(
       datastore_write_it_pipeline is None, 'GCP dependencies are not installed')
   def test_datastore_write_limit(self):
diff --git a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
index 16e976c..7970dd5 100644
--- a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
@@ -29,7 +29,7 @@
 import string
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.io import fileio
@@ -133,7 +133,7 @@
     # clean up the temp Dicom store
     delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_dicom_search_instances(self):
     # Search and compare the metadata of a persistent DICOM store.
     # Both refine and comprehensive search will be tested.
@@ -183,7 +183,7 @@
           equal_to([expected_dict_refine]),
           label='refine search assert')
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_dicom_store_instance_from_gcs(self):
     # Store DICOM files to a empty DICOM store from a GCS bucket,
     # then check if the store metadata match.
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
index d87e7be..0272dac 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
@@ -21,7 +21,7 @@
 import unittest
 import uuid
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -101,7 +101,7 @@
     cls._add_dummy_entries()
     _LOGGER.info("Spanner Read IT Setup Complete...")
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_read_via_table(self):
     _LOGGER.info("Spanner Read via table")
     with beam.Pipeline(argv=self.args) as p:
@@ -113,7 +113,7 @@
           columns=["UserId", "Key"])
     assert_that(r, equal_to(self._data))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_read_via_sql(self):
     _LOGGER.info("Running Spanner via sql")
     with beam.Pipeline(argv=self.args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
index 1701cbb..7f2c8e3 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
@@ -20,7 +20,7 @@
 import unittest
 import uuid
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -103,7 +103,7 @@
     cls._create_database()
     _LOGGER.info('Spanner Write IT Setup Complete...')
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_write_batches(self):
     _prefex = 'test_write_batches'
     mutations = [
@@ -129,7 +129,7 @@
     res.wait_until_finish()
     self.assertEqual(self._count_data(_prefex), len(mutations))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_spanner_update(self):
     _prefex = 'test_update'
 
@@ -165,7 +165,7 @@
     res.wait_until_finish()
     self.assertEqual(self._count_data(_prefex), 2)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_spanner_error(self):
     mutations_update = [
         WriteMutation.update(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
index 5aedb6c..b06e374 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
@@ -43,7 +43,7 @@
 import unittest
 import uuid
 
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -111,17 +111,17 @@
     self.gcsio.copy(src, dst, kms_key_name, **extra_kwargs)
     self._verify_copy(src, dst, kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy(self):
     self._test_copy("test_copy")
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy_kms(self):
     if self.kms_key_name is None:
       raise unittest.SkipTest('--kms_key_name not specified')
     self._test_copy("test_copy_kms", self.kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skip('BEAM-12352: enable once maxBytesRewrittenPerCall works again')
   def test_copy_rewrite_token(self):
     # Tests a multi-part copy (rewrite) operation. This is triggered by a
@@ -165,17 +165,17 @@
     for _src, _dst in src_dst_pairs:
       self._verify_copy(_src, _dst, kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy_batch_kms(self):
     if self.kms_key_name is None:
       raise unittest.SkipTest('--kms_key_name not specified')
     self._test_copy_batch("test_copy_batch_kms", self.kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skip('BEAM-12352: enable once maxBytesRewrittenPerCall works again')
   def test_copy_batch_rewrite_token(self):
     # Tests a multi-part copy (rewrite) operation. This is triggered by a
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 7e20be3..541bb52 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -24,8 +24,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.io.gcp import pubsub_it_pipeline
 from apache_beam.io.gcp.pubsub import PubsubMessage
@@ -204,11 +204,11 @@
         id_label=self.ID_LABEL,
         timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_data_only(self):
     self._test_streaming(with_attributes=False)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_with_attributes(self):
     self._test_streaming(with_attributes=True)
 
diff --git a/sdks/python/apache_beam/io/parquetio_it_test.py b/sdks/python/apache_beam/io/parquetio_it_test.py
index ee4e0f0..0d3cd0d 100644
--- a/sdks/python/apache_beam/io/parquetio_it_test.py
+++ b/sdks/python/apache_beam/io/parquetio_it_test.py
@@ -21,7 +21,7 @@
 import unittest
 from collections import Counter
 
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam import Create
 from apache_beam import DoFn
@@ -52,7 +52,7 @@
   def tearDown(self):
     pass
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_parquetio_it(self):
     file_prefix = "parquet_it_test"
     init_size = 10
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py b/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
index 4ada679..a699aaa 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
@@ -20,7 +20,7 @@
 import logging
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,7 +66,7 @@
     self.runner_name = type(self.test_pipeline.runner).__name__
     self.project = self.test_pipeline.get_option('project')
 
-  @attr("IT")
+  @pytest.mark.it_postcommit
   def test_deidentification(self):
     with TestPipeline(is_integration_test=True) as p:
       output = (
@@ -77,7 +77,7 @@
               inspection_config=INSPECT_CONFIG))
       assert_that(output, equal_to(['####################']))
 
-  @attr("IT")
+  @pytest.mark.it_postcommit
   def test_inspection(self):
     with TestPipeline(is_integration_test=True) as p:
       output = (
diff --git a/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py b/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
index 932bc685..9adf56a 100644
--- a/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
@@ -18,7 +18,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -46,7 +46,7 @@
       ])
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(AnnotateText is None, 'GCP dependencies are not installed')
 class NaturalLanguageMlTestIT(unittest.TestCase):
   def test_analyzing_syntax(self):
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
index 8c94a29..535a29f 100644
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
@@ -22,7 +22,7 @@
 import random
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -53,7 +53,7 @@
   yield response[0]["results"]
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(
     recommendationengine is None,
     "Recommendations AI dependencies not installed.")
diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
index d934520..03f79d1 100644
--- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
@@ -22,7 +22,7 @@
 import unittest
 
 import hamcrest as hc
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -45,7 +45,7 @@
       yield segment.entity.description
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(
     AnnotateVideoWithContext is None, 'GCP dependencies are not installed')
 class VideoIntelligenceMlTestIT(unittest.TestCase):
diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test_it.py b/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
index 14af3cb8..4413266 100644
--- a/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
@@ -18,7 +18,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -40,7 +40,7 @@
       yield text_annotation.description
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(vision is None, 'GCP dependencies are not installed')
 class VisionMlTestIT(unittest.TestCase):
   def test_text_detection_with_language_hint(self):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
index 0ee0e0f..934dfe5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
@@ -23,7 +23,6 @@
 import unittest
 
 import pytest
-from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -43,7 +42,7 @@
     p = beam.Pipeline(options=pipeline_options)
     return dataflow_exercise_metrics_pipeline.apply_and_run(p)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_metrics_it(self):
     result = self.run_pipeline()
     errors = metric_result_matchers.verify_all(
@@ -51,7 +50,7 @@
         dataflow_exercise_metrics_pipeline.metric_matchers())
     self.assertFalse(errors, str(errors))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @pytest.mark.it_validatescontainer
   def test_metrics_fnapi_it(self):
     result = self.run_pipeline(experiment='beam_fn_api')
diff --git a/sdks/python/apache_beam/testing/test_stream_it_test.py b/sdks/python/apache_beam/testing/test_stream_it_test.py
index 65a6aeb..0e293ed 100644
--- a/sdks/python/apache_beam/testing/test_stream_it_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_it_test.py
@@ -22,7 +22,7 @@
 import unittest
 from functools import wraps
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.options.pipeline_options import StandardOptions
@@ -67,7 +67,7 @@
     cls.project = cls.test_pipeline.get_option('project')
 
   @supported(['DirectRunner', 'SwitchingDirectRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_basic_execution(self):
     test_stream = (
         TestStream().advance_watermark_to(10).add_elements([
@@ -103,7 +103,7 @@
           ]))
 
   @supported(['DirectRunner', 'SwitchingDirectRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_outputs(self):
     """Tests that the TestStream supports emitting to multiple PCollections."""
     letters_elements = [
@@ -151,7 +151,7 @@
     p.run()
 
   @supported(['DirectRunner', 'SwitchingDirectRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_outputs_with_watermark_advancement(self):
     """Tests that the TestStream can independently control output watermarks."""
 
diff --git a/sdks/python/apache_beam/transforms/external_it_test.py b/sdks/python/apache_beam/transforms/external_it_test.py
index b1eda0a..e24b70f 100644
--- a/sdks/python/apache_beam/transforms/external_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_it_test.py
@@ -21,7 +21,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam import Pipeline
@@ -33,7 +33,7 @@
 
 
 class ExternalTransformIT(unittest.TestCase):
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_job_python_from_python_it(self):
     @ptransform.PTransform.register_urn('simple', None)
     class SimpleTransform(ptransform.PTransform):
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 2906300..a162b69 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -29,6 +29,7 @@
 markers =
     xlang_transforms: collect Cross Language transforms test runs
     xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs
+    it_postcommit: collect for post-commit integration test runs
     it_validatesrunner: collect for ValidatesRunner integration test runs
     no_sickbay_streaming: run without sickbay-streaming
     no_sickbay_batch: run without sickbay-batch
diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle
index 3daf379..7d1cf2b 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -28,14 +28,7 @@
 
 def runScriptsDir = "${rootDir}/sdks/python/scripts"
 
-// TODO(BEAM-3713) Remove once nose is removed
 // Basic test options for ITs running on Jenkins.
-def basicTestOpts = [
-    "--nocapture",  // print stdout instantly
-    "--processes=8",  // run tests in parallel
-    "--process-timeout=4500", // timeout of whole command execution
-]
-
 def basicPytestOpts = [
     "--capture=no",  // print stdout instantly
     "--timeout=4500", // timeout of whole command execution
@@ -56,22 +49,23 @@
     doLast {
       // Basic integration tests to run in PreCommit
       def precommitTests = streaming ? [
-              "apache_beam.examples.streaming_wordcount_it_test:StreamingWordCountIT.test_streaming_wordcount_it",
+              "apache_beam/examples/streaming_wordcount_it_test.py::StreamingWordCountIT::test_streaming_wordcount_it",
       ] : [
-              "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
+              "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
       ]
       def testOpts = [
-              "--tests=${precommitTests.join(',')}",
-              "--nocapture",    // Print stdout instantly
-              "--processes=2",    // Number of tests running in parallel
-              "--process-timeout=1800",   // Timeout of whole command execution
+              "${precommitTests.join(' ')}",
+              "--capture=no",    // Print stdout instantly
+              "--numprocesses=2",    // Number of tests running in parallel
+              "--timeout=1800",   // Timeout of whole command execution
       ]
 
       def argMap = [
               "test_opts"   : testOpts,
               "sdk_location": files(configurations.distTarBall.files).singleFile,
               "worker_jar"  : dataflowWorkerJar,
-              "suite"       : "preCommitIT-df${pythonSuffix}"
+              "suite"       : "preCommitIT-df${pythonSuffix}",
+              "pytest"      : true, // TODO(BEAM-3713): Remove this once nose is removed.
       ]
 
       if (runnerV2) {
@@ -115,14 +109,16 @@
   def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
 
   doLast {
-    def testOpts = basicTestOpts + ["--attr=IT"]
-
-    def cmdArgs = mapToArgString([
+    def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
+    def argMap = [
         "test_opts": testOpts,
         "sdk_location": files(configurations.distTarBall.files).singleFile,
         "worker_jar": dataflowWorkerJar,
-        "suite": "postCommitIT-df${pythonVersionSuffix}"
-    ])
+        "suite": "postCommitIT-df${pythonVersionSuffix}",
+        "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+        "collect": "it_postcommit"
+    ]
+    def cmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
       args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle
index 42bba4c..50fe59a 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -21,9 +21,11 @@
 def runScriptsDir = "${rootDir}/sdks/python/scripts"
 // Basic test options for ITs running on Jenkins.
 def basicTestOpts = [
-    "--nocapture",  // print stdout instantly
-    "--processes=8",  // run tests in parallel
-    "--process-timeout=4500", // timeout of whole command execution
+    "--capture=no",  // print stdout instantly
+    "--numprocesses=8",  // run tests in parallel
+    "--timeout=4500", // timeout of whole command execution
+    "--color=yes", // console color
+    "--log-cli-level=INFO" //log level info
 ]
 
 task postCommitIT {
@@ -32,25 +34,22 @@
   // Run IT tests with TestDirectRunner in batch in Python 3.
   doLast {
     def batchTests = [
-        "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
-        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
-        "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
-        "apache_beam.io.gcp.bigquery_io_read_it_test",
-        "apache_beam.io.gcp.bigquery_read_it_test",
-        "apache_beam.io.gcp.bigquery_write_it_test",
-        "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
-        "apache_beam.io.gcp.experimental.spannerio_read_it_test",
-        "apache_beam.io.gcp.experimental.spannerio_write_it_test",
+        "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+        "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+        "apache_beam/io/gcp/big_query_query_to_table_it_test.py::BigQueryQueryToTableIT",
+        "apache_beam/io/gcp/bigquery_io_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_write_it_test.py",
+        "apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
+        "apache_beam/io/gcp/experimental/spannerio_read_it_test.py",
+        "apache_beam/io/gcp/experimental/spannerio_write_it_test.py",
     ]
-    def testOpts = [
-        "--tests=${batchTests.join(',')}",
-        "--nocapture",    // Print stdout instantly
-        "--processes=8",  // run tests in parallel
-        "--process-timeout=4500", // timeout of whole command execution
-    ]
+    def testOpts = basicTestOpts + ["${batchTests.join(' ')}"]
     def argMap = ["runner": "TestDirectRunner",
                   "test_opts": testOpts,
-                  "suite": "postCommitIT-direct-py${pythonVersionSuffix}"]
+                  "suite": "postCommitIT-direct-py${pythonVersionSuffix}",
+                  "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+                  ]
     def batchCmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
@@ -97,18 +96,20 @@
   // Run IT tests with TestDirectRunner in batch.
   doLast {
     def tests = [
-        "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
-        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
-        "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
-        "apache_beam.io.gcp.bigquery_io_read_it_test",
-        "apache_beam.io.gcp.bigquery_read_it_test",
-        "apache_beam.io.gcp.bigquery_write_it_test",
-        "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
+        "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+        "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+        "apache_beam/io/gcp/big_query_query_to_table_it_test.py::BigQueryQueryToTableIT",
+        "apache_beam/io/gcp/bigquery_io_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_write_it_test.py",
+        "apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
     ]
-    def batchTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
+    def batchTestOpts = basicTestOpts + ["${tests.join(' ')}"]
     def argMap = ["runner": "TestDirectRunner",
                   "test_opts": batchTestOpts,
-                  "suite": "directRunnerIT-batch"]
+                  "suite": "directRunnerIT-batch",
+                  "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+                  ]
     def batchCmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
@@ -119,18 +120,19 @@
   // Run IT tests with TestDirectRunner in streaming.
   doLast {
     def tests = [
-        "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
-        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
-        "apache_beam.io.gcp.bigquery_test:BigQueryStreamingInsertTransformIntegrationTests\
-.test_multiple_destinations_transform",
-        "apache_beam.io.gcp.bigquery_test:PubSubBigQueryIT",
-        "apache_beam.io.gcp.bigquery_file_loads_test:BigQueryFileLoadsIT.test_bqfl_streaming",
+        "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+        "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+        "apache_beam/io/gcp/bigquery_test.py::BigQueryStreamingInsertTransformIntegrationTests::test_multiple_destinations_transform",
+        "apache_beam/io/gcp/bigquery_test.py::PubSubBigQueryIT",
+        "apache_beam/io/gcp/bigquery_file_loads_test.py::BigQueryFileLoadsIT::test_bqfl_streaming",
     ]
-    def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
+    def streamingTestOpts = basicTestOpts + ["${tests.join(' ')}"]
     def argMap = ["runner": "TestDirectRunner",
                   "streaming": "true",
                   "test_opts": streamingTestOpts,
-                  "suite": "directRunnerIT-streaming"]
+                  "suite": "directRunnerIT-streaming",
+                  "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+                ]
     def streamingCmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index ecf0822..20665f0 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -201,14 +201,14 @@
 
   doLast {
     def tests = [
-            "apache_beam.io.gcp.bigquery_read_it_test",
-            "apache_beam.io.external.xlang_jdbcio_it_test",
-            "apache_beam.io.external.xlang_kafkaio_it_test",
-            "apache_beam.io.external.xlang_kinesisio_it_test",
-            "apache_beam.io.gcp.tests.xlang_spannerio_it_test",
-            "apache_beam.io.external.xlang_debeziumio_it_test",
+            "apache_beam/io/gcp/bigquery_read_it_test.py",
+            "apache_beam/io/external/xlang_jdbcio_it_test.py",
+            "apache_beam/io/external/xlang_kafkaio_it_test.py",
+            "apache_beam/io/external/xlang_kinesisio_it_test.py",
+            "apache_beam/io/gcp/tests/xlang_spannerio_it_test.py",
+            "apache_beam/io/external/xlang_debeziumio_it_test.py",
     ]
-    def testOpts = ["--tests=${tests.join(',')}"]
+    def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
     def pipelineOpts = [
         "--runner=FlinkRunner",
         "--project=apache-beam-testing",
@@ -220,6 +220,7 @@
             "test_opts": testOpts,
             "suite": "postCommitIT-flink-py${pythonVersionSuffix}",
             "pipeline_opts": pipelineOpts.join(" "),
+            "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
     ])
     def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
     exec {