add prestop hook to prevent logging issue

There is an issue in EKS where pods are deleted too quickly.
This preStopHook gives fluentd time to flush all logs before the
pod dies
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 1f2d347..c1ab35c 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -160,6 +160,7 @@
         self.worker_service_account_name = conf.get(
             self.kubernetes_section, 'worker_service_account_name')
         self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
+        self.prestop_wait_time = conf.getint(self.kubernetes_section, 'prestop_wait_time', fallback=0)
 
         # NOTE: user can build the dags into the docker image directly,
         # this will set to True if so
@@ -490,7 +491,8 @@
             task_id=self._make_safe_label_value(task_id),
             try_number=try_number,
             execution_date=self._datetime_to_label_safe_datestring(execution_date),
-            airflow_command=command, kube_executor_config=kube_executor_config
+            airflow_command=command, kube_executor_config=kube_executor_config,
+            prestop_wait_time=self.kube_config.prestop_wait_time,
         )
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 7736dc5..db26b23 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -215,6 +215,21 @@
             req['spec']['dnsPolicy'] = pod.dnspolicy
 
     @staticmethod
+    def extract_prestop_wait_time(pod, req):
+        if pod.prestop_wait_time > 0:
+            req['spec']['containers'][0]['lifecycle'] = \
+                {'preStop':
+                    {'exec':
+                        {'command':
+                            ["/bin/sh",
+                             "-c",
+                             "sleep {}".format(pod.prestop_wait_time)
+                             ]
+                         }
+                     }
+                 }
+
+    @staticmethod
     def extract_image_pull_secrets(pod, req):
         if pod.image_pull_secrets:
             req['spec']['imagePullSecrets'] = [{
diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index d286da9..14057f5 100644
--- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -65,6 +65,7 @@
         self.extract_tolerations(pod, req)
         self.extract_security_context(pod, req)
         self.extract_dnspolicy(pod, req)
+        self.extract_prestop_wait_time(pod, req)
         return req
 
 
@@ -135,4 +136,5 @@
         self.extract_tolerations(pod, req)
         self.extract_security_context(pod, req)
         self.extract_dnspolicy(pod, req)
+        self.extract_prestop_wait_time(pod, req)
         return req
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index b799c50..96a4b6e 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -90,6 +90,8 @@
     :type pod_runtime_info_envs: list[PodRuntimeEnv]
     :param dnspolicy: Specify a dnspolicy for the pod
     :type dnspolicy: str
+    :param prestop_wait_time: how long a worker pod waits after completion to terminate
+    :type prestop_wait_time: int
     """
     def __init__(
             self,
@@ -118,7 +120,8 @@
             security_context=None,
             configmaps=None,
             pod_runtime_info_envs=None,
-            dnspolicy=None
+            dnspolicy=None,
+            prestop_wait_time=0,
     ):
         self.image = image
         self.envs = envs or {}
@@ -146,3 +149,4 @@
         self.configmaps = configmaps or []
         self.pod_runtime_info_envs = pod_runtime_info_envs or []
         self.dnspolicy = dnspolicy
+        self.prestop_wait_time = prestop_wait_time
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index 337d928..1f81e70 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -366,7 +366,7 @@
         return dag_volume_mount_path
 
     def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
-                 try_number, airflow_command, kube_executor_config):
+                 try_number, airflow_command, prestop_wait_time, kube_executor_config):
         volumes_dict, volume_mounts_dict = self._get_volumes_and_mounts()
         worker_init_container_spec = self._get_init_containers()
         resources = Resources(
@@ -415,5 +415,6 @@
             affinity=affinity,
             tolerations=tolerations,
             security_context=self._get_security_context(),
-            configmaps=self._get_configmaps()
+            configmaps=self._get_configmaps(),
+            prestop_wait_time=prestop_wait_time,
         )
diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile
index eab7fbe..5581839 100644
--- a/scripts/ci/kubernetes/docker/Dockerfile
+++ b/scripts/ci/kubernetes/docker/Dockerfile
@@ -38,6 +38,7 @@
 RUN pip install -U setuptools && \
     pip install kubernetes && \
     pip install cryptography && \
+    pip install "Werkzeug<1.0.0" && \
     pip install psycopg2-binary==2.7.4  # I had issues with older versions of psycopg2, just a warning
 
 # install airflow
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py
index f560026..3e1e17c 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -400,8 +400,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
         username_env = {
             'name': 'GIT_SYNC_USERNAME',
             'valueFrom': {
@@ -459,7 +459,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
 
         self.assertEqual(0, pod.security_context['runAsUser'])
 
@@ -479,8 +480,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
         init_containers = worker_config._get_init_containers()
         git_ssh_key_file = next((x['value'] for x in init_containers[0]['env']
                                 if x['name'] == 'GIT_SSH_KEY_FILE'), None)
@@ -532,7 +533,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
 
         self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
         self.assertEqual('app',
@@ -556,7 +558,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
 
         self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
         self.assertEqual('app',
@@ -655,7 +658,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
 
         airflow_config_volume = [
             volume for volume in pod.volumes if volume["name"] == 'airflow-config'
@@ -704,7 +708,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
 
         airflow_config_volume = [
             volume for volume in pod.volumes if volume["name"] == 'airflow-config'
@@ -727,7 +732,8 @@
         ]
         self.assertEqual(2, len(volume_mounts))
 
-        self.assertCountEqual(
+        six.assertCountEqual(
+            self,
             [
                 {
                     'mountPath': '/usr/local/airflow/airflow.cfg',
@@ -762,7 +768,8 @@
 
         pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
                                      "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
+                                     prestop_wait_time=0,
+                                     kube_executor_config=kube_executor_config)
 
         airflow_local_settings_volume = [
             volume for volume in pod.volumes if volume["name"] == 'airflow-local-settings'
@@ -887,7 +894,7 @@
         self.assertEqual({
             'my_label': 'label_id',
             'dag_id': 'override_dag_id',
-            'my_kube_executor_label': 'kubernetes'
+            'my_kube_executor_label': 'kubernetes',
         }, labels)
 
 
diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
index b6bc0b3..a963af8 100644
--- a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
+++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.contrib.kubernetes.kubernetes_request_factory.\
+from airflow.contrib.kubernetes.kubernetes_request_factory. \
     kubernetes_request_factory import KubernetesRequestFactory
 from airflow.contrib.kubernetes.pod import Pod, Resources
 from airflow.contrib.kubernetes.secret import Secret
@@ -28,7 +28,6 @@
 class TestKubernetesRequestFactory(unittest.TestCase):
 
     def setUp(self):
-
         self.expected = {
             'apiVersion': 'v1',
             'kind': 'Pod',
@@ -65,6 +64,27 @@
         self.expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy
         self.assertEqual(self.input_req, self.expected)
 
+    def test_extract_prestop_no_wait(self):
+        pod = Pod('v3.14', {}, [], prestop_wait_time=0)
+        KubernetesRequestFactory.extract_prestop_wait_time(pod, self.input_req)
+        self.assertEqual(self.input_req, self.expected)
+
+    def test_extract_prestop_with_wait(self):
+        pod = Pod('v3.14', {}, [], prestop_wait_time=6)
+        KubernetesRequestFactory.extract_prestop_wait_time(pod, self.input_req)
+        self.expected['spec']['containers'][0]['lifecycle'] = \
+            {'preStop':
+                {'exec':
+                    {'command':
+                        ["/bin/sh",
+                         "-c",
+                         "sleep {}".format(pod.prestop_wait_time)
+                         ]
+                     }
+                 }
+             }
+        self.assertEqual(self.input_req, self.expected)
+
     def test_add_secret_to_env(self):
         secret = Secret('env', 'target', 'my-secret', 'KEY')
         secret_list = []