Fix SFTPSensor.newer_than not working with jinja logical ds/ts expression (#39056)
* Fixes https://github.com/apache/airflow/issues/36629
* Fixes PR failed test
* Remove an parametrize duplicate tests
* Fix formatting
* Fix formatting
* Fixes https://github.com/apache/airflow/issues/36629
* Fixes PR failed test
* Remove an parametrize duplicate tests
* update simple-salesforce type hints to support 1.12.6 (#39047)
* Fix formatting
* Add changelog for airflow python client 2.9.0 (#39060)
* Upgrade to latest hatchling as build dependency (#39044)
* Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995) (#39054)
* Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995)
* update databricks
* [docs] update `DagBag` class docstring to include all params (#38814)
* update docstring for DagBag class
* break long line
* fix space
Signed-off-by: kalyanr <kalyan.ben10@live.com>
---------
Signed-off-by: kalyanr <kalyan.ben10@live.com>
* Data aware scheduling docs edits (#38687)
* Moves airflow import in deprecated pod_generator to local (#39062)
The import might be invoked when K8S executor starts with sentry on
and it might lead to circular imports
Related: #31442
* KPO xcom sidecar PodDefault usage (#38951)
We should use the same, non deprecated, version of PodDefaults for the
xcom sidecar when creating and reading xcom.
* Fix formatting
* Change date/time parsing method for newer_than parameter un SFTPSensor
* Add examples in AWS auth manager documentation (#39040)
* update document (#39068)
* Update hatchling to version 1.24.0 (#39072)
* Check that the dataset<>task exists before trying to render graph (#39069)
* Change date/time parsing method for newer_than parameter un SFTPSensor
* Fix utc timezone in unit tests
* Fix utc timezone in unit tests
---------
Signed-off-by: kalyanr <kalyan.ben10@live.com>
Co-authored-by: Grégoire Rolland <gregoire.rolland@mymoneybank.com>
Co-authored-by: Hussein Awala <hussein@awala.fr>
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com>
Co-authored-by: Kalyan <kalyan.ben10@live.com>
Co-authored-by: Laura Zdanski <25642903+lzdanski@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com>
Co-authored-by: humit <jhjang1005@naver.com>
Co-authored-by: Brent Bovenzi <brent@astronomer.io>
diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py
index 02055f3..de38709 100644
--- a/airflow/providers/sftp/sensors/sftp.py
+++ b/airflow/providers/sftp/sensors/sftp.py
@@ -30,7 +30,7 @@
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
-from airflow.utils.timezone import convert_to_utc
+from airflow.utils.timezone import convert_to_utc, parse
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -57,7 +57,7 @@
*,
path: str,
file_pattern: str = "",
- newer_than: datetime | None = None,
+ newer_than: datetime | str | None = None,
sftp_conn_id: str = "sftp_default",
python_callable: Callable | None = None,
op_args: list | None = None,
@@ -70,7 +70,7 @@
self.file_pattern = file_pattern
self.hook: SFTPHook | None = None
self.sftp_conn_id = sftp_conn_id
- self.newer_than: datetime | None = newer_than
+ self.newer_than: datetime | str | None = newer_than
self.python_callable: Callable | None = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
@@ -105,6 +105,8 @@
continue
if self.newer_than:
+ if isinstance(self.newer_than, str):
+ self.newer_than = parse(self.newer_than)
_mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S"))
_newer_than = convert_to_utc(self.newer_than)
if _newer_than <= _mod_time:
diff --git a/tests/providers/sftp/sensors/test_sftp.py b/tests/providers/sftp/sensors/test_sftp.py
index 6a08b37..25add45 100644
--- a/tests/providers/sftp/sensors/test_sftp.py
+++ b/tests/providers/sftp/sensors/test_sftp.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from datetime import datetime
+from datetime import datetime, timezone as stdlib_timezone
from unittest import mock
from unittest.mock import Mock, call, patch
@@ -97,11 +97,25 @@
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
assert not output
+ @pytest.mark.parametrize(
+ "newer_than",
+ (
+ datetime(2020, 1, 2),
+ datetime(2020, 1, 2, tzinfo=stdlib_timezone.utc),
+ "2020-01-02",
+ "2020-01-02 00:00:00+00:00",
+ "2020-01-02 00:00:00.001+00:00",
+ "2020-01-02T00:00:00+00:00",
+ "2020-01-02T00:00:00Z",
+ "2020-01-02T00:00:00+04:00",
+ "2020-01-02T00:00:00.000001+04:00",
+ ),
+ )
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
- def test_naive_datetime(self, sftp_hook_mock):
+ def test_multiple_datetime_format_in_newer_than(self, sftp_hook_mock, newer_than):
sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000"
sftp_sensor = SFTPSensor(
- task_id="unit_test", path="/path/to/file/1970-01-01.txt", newer_than=datetime(2020, 1, 2)
+ task_id="unit_test", path="/path/to/file/1970-01-01.txt", newer_than=newer_than
)
context = {"ds": "1970-01-00"}
output = sftp_sensor.poke(context)