Add --pid-file flag to `aurora task ssh` to write the PID of the underlying SSH command to a specified file.

My team has some scripts to start devel shards which create tunnels:

```
aurora task ssh -L 8002:http --ssh-options "-f -N" "$DC/$USER/devel/proxyapp/0"
aurora task ssh -L 9002:health --ssh-options "-f -N" "$DC/$USER/devel/proxyapp/0"
```

We use fixed local port numbers because that way we can run dependent services locally that look for locally-running copies of the
same service on a fixed port, but then those requests get tunnelled through to the devel shard.

When the devel shard is restarted, however, the tunnel is still running so the subsequent call to create a new tunnel fails because
it can't bind to the fixed port.

If we save the SSH process PID to a file, we can then kill existing tunnel to the old instance before starting up the new tunnel to the
new instance.

Testing Done:
```
$ ./pants test src/test/python/apache/aurora/client::
```

And when applying the same patch to our local repo at Twitter:

```
$ ./pants run twitter/src/main/python/twitter/aurora/client/cli_internal:aurora_internal -- task ssh -L 8005:http --ssh-options "-n -N" --pid-file /tmp/p "smf1/sbrenn/devel/proxyapp/0" &
$ ps -p `cat /tmp/p`
  PID TTY           TIME CMD
34729 ttys000    0:00.05 ssh -t -n -N -L 8005:smf1-aki-27-sr1.prod.twitter.com:31794 sbrenn@smf1-aki-27-sr1.prod.twitter.com cd /var/lib/mesos/slaves/*/frameworks/*/exec
```

Reviewed at https://reviews.apache.org/r/66697/
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 652a545..44dc908 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -109,6 +109,9 @@
         CommandOption('--command', '-c', dest='command', type=str, default=None,
             metavar="unix_command_line",
             help="Command to execute through the ssh connection."),
+        CommandOption('--pid-file', '-p', dest='pid_file', type=str, default=None,
+            metavar="pid_file",
+            help="File in which to store the PID of the resulting ssh call")
     ]
 
   def execute(self, context):
@@ -154,7 +157,13 @@
           '-L', '%d:%s:%d' % (port, slave_host, assigned.assignedPorts[name])]
 
     ssh_command += ['%s@%s' % (context.options.ssh_user or role, slave_host), command]
-    return subprocess.call(ssh_command)
+    process = subprocess.Popen(ssh_command)
+
+    if context.options.pid_file:
+      with open(context.options.pid_file, "w") as f:
+        f.write(str(process.pid))
+
+    return process.wait()
 
 
 class ScpCommand(Verb):
diff --git a/src/test/python/apache/aurora/client/cli/test_task.py b/src/test/python/apache/aurora/client/cli/test_task.py
index a543d4a..effcaf6 100644
--- a/src/test/python/apache/aurora/client/cli/test_task.py
+++ b/src/test/python/apache/aurora/client/cli/test_task.py
@@ -13,6 +13,7 @@
 #
 
 import contextlib
+import tempfile
 
 import pytest
 from mock import Mock, patch
@@ -110,6 +111,14 @@
 
 
 class TestSshCommand(AuroraClientCommandTest):
+  MOCKED_PID = 12312
+
+  @classmethod
+  def create_mock_process(cls):
+    process = Mock()
+    process.pid = cls.MOCKED_PID
+    process.wait.return_value = 0
+    return process
 
   @classmethod
   def create_status_response(cls):
@@ -128,6 +137,45 @@
   def create_failed_status_response(cls):
     return cls.create_blank_response(ResponseCode.INVALID_REQUEST, 'No tasks found for query')
 
+  def test_successful_ssh_with_pidfile(self):
+    """Test the ssh command with a saved PID file."""
+    with tempfile.NamedTemporaryFile() as pid_file:
+      (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+      mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
+      sandbox_args = {'slave_root': '/slaveroot', 'slave_run_directory': 'slaverun'}
+      with contextlib.nested(
+          patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+          patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
+              return_value=sandbox_args),
+          patch('subprocess.Popen', return_value=self.create_mock_process())) as (
+              mock_scheduler_proxy_class,
+              mock_runner_args_patch,
+              mock_subprocess):
+        cmd = AuroraCommandLine()
+        cmd.execute([
+          'task',
+          'ssh',
+          '--pid-file={}'.format(pid_file.name),
+          '--ssh-options=-v',
+          'west/bozo/test/hello/1',
+          '--command=ls'
+        ])
+
+        # The status command sends a getTasksStatus query to the scheduler,
+        # and then prints the result.
+        mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery(
+            jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
+            instanceIds=set([1]),
+            statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
+                ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, ScheduleStatus.DRAINING])),
+            retry=True)
+        mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost',
+            'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'
+            'slaverun/sandbox;ls'])
+
+        pid = pid_file.read()
+        assert(pid == str(self.MOCKED_PID))
+
   def test_successful_ssh(self):
     """Test the ssh command."""
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
@@ -137,7 +185,7 @@
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
             return_value=sandbox_args),
-        patch('subprocess.call', return_value=0)) as (
+        patch('subprocess.Popen', return_value=self.create_mock_process())) as (
             mock_scheduler_proxy_class,
             mock_runner_args_patch,
             mock_subprocess):
@@ -165,7 +213,7 @@
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
             return_value=sandbox_args),
-        patch('subprocess.call', return_value=0)) as (
+        patch('subprocess.Popen', return_value=self.create_mock_process())) as (
             mock_scheduler_proxy_class,
             mock_runner_args_patch,
             mock_subprocess):
@@ -190,7 +238,7 @@
     mock_scheduler_proxy.getTasksStatus.return_value = self.create_nojob_status_response()
     with contextlib.nested(
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
-        patch('subprocess.call', return_value=0)) as (
+        patch('subprocess.Popen', return_value=self.create_mock_process())) as (
             mock_scheduler_proxy_class,
             mock_subprocess):
       cmd = AuroraCommandLine()
@@ -204,7 +252,7 @@
     mock_scheduler_proxy.getTasksStatus.return_value = self.create_nojob_status_response()
     with contextlib.nested(
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
-        patch('subprocess.call', return_value=0)) as (
+        patch('subprocess.Popen', return_value=self.create_mock_process())) as (
             mock_scheduler_proxy_class,
             mock_subprocess):
       cmd = AuroraCommandLine()