Lambda function to scale ASG based on Github webhooks (#2)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 02a11bd..fa32fe1 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -31,7 +31,7 @@
       - id: forbid-tabs
       - id: insert-license
         name: Add license
-        exclude: ^\.github/.*$|^license-templates/
+        exclude: ^\.github/.*$|^license-templates/|\.json$
         args:
           - --comment-style
           - "|#|"
diff --git a/lambdas/scale_out_runner/.chalice/config.json b/lambdas/scale_out_runner/.chalice/config.json
new file mode 100644
index 0000000..bd29d8c
--- /dev/null
+++ b/lambdas/scale_out_runner/.chalice/config.json
@@ -0,0 +1,19 @@
+{
+  "version": "2.0",
+  "app_name": "scale_out_runner",
+  "stages": {
+    "dev": {
+      "api_gateway_stage": "api"
+    },
+    "prod": {
+      "api_gateway_stage": "api",
+      "autogen_policy": false,
+      "iam_policy_file": "prod_iam.json",
+      "environment_variables": {
+        "GH_WEBHOOK_TOKEN_ENCRYPTED": "AQICAHg1MGVq8MAqYXSkkgy6iL19KSI14nJw8DelmHFuRpAfvQEh/mniWicOD0N1aVuHhp+VAAAAfDB6BgkqhkiG9w0BBwagbTBrAgEAMGYGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMVqkAu70BaR2rDQ4qAgEQgDlIAxDrBlblL6XNKkJIs+zrNEMcGpS68JnoVh2s1oKEzbBXzYS16/9ZrW9CwUV4ULAd2EsGb90iz74=",
+        "ACTIONS_SQS_URL": "https://sqs.eu-central-1.amazonaws.com/827901512104/actions-runner-requests"
+      }
+    }
+  },
+  "automatic_layer": true
+}
diff --git a/lambdas/scale_out_runner/.chalice/deployed/prod.json b/lambdas/scale_out_runner/.chalice/deployed/prod.json
new file mode 100644
index 0000000..7f1a411
--- /dev/null
+++ b/lambdas/scale_out_runner/.chalice/deployed/prod.json
@@ -0,0 +1,28 @@
+{
+  "resources": [
+    {
+      "name": "managed-layer",
+      "resource_type": "lambda_layer",
+      "layer_version_arn": "arn:aws:lambda:eu-central-1:827901512104:layer:scale_out_runner-prod-managed-layer:35"
+    },
+    {
+      "name": "api_handler_role",
+      "resource_type": "iam_role",
+      "role_arn": "arn:aws:iam::827901512104:role/scale_out_runner-prod-api_handler",
+      "role_name": "scale_out_runner-prod-api_handler"
+    },
+    {
+      "name": "api_handler",
+      "resource_type": "lambda_function",
+      "lambda_arn": "arn:aws:lambda:eu-central-1:827901512104:function:scale_out_runner-prod"
+    },
+    {
+      "name": "rest_api",
+      "resource_type": "rest_api",
+      "rest_api_id": "2onm92olq7",
+      "rest_api_url": "https://2onm92olq7.execute-api.eu-central-1.amazonaws.com/api/"
+    }
+  ],
+  "schema_version": "2.0",
+  "backend": "api"
+}
diff --git a/lambdas/scale_out_runner/.chalice/prod_iam.json b/lambdas/scale_out_runner/.chalice/prod_iam.json
new file mode 100644
index 0000000..539bac3
--- /dev/null
+++ b/lambdas/scale_out_runner/.chalice/prod_iam.json
@@ -0,0 +1,33 @@
+{
+    "Version": "2012-10-17",
+    "Statement": [
+        {
+            "Sid": "VisualEditor0",
+            "Effect": "Allow",
+            "Action": [
+                "logs:CreateLogStream",
+                "kms:Decrypt",
+                "autoscaling:SetDesiredCapacity",
+                "ssm:GetParameter",
+                "logs:CreateLogGroup",
+                "logs:PutLogEvents",
+                "dynamodb:UpdateItem"
+            ],
+            "Resource": [
+                "arn:aws:ssm:*:827901512104:parameter/runners/*/configOverlay",
+                "arn:aws:autoscaling:*:827901512104:autoScalingGroup:*:autoScalingGroupName/AshbRunnerASG",
+                "arn:aws:kms:*:827901512104:key/48a58710-7ac6-4f88-995f-758a6a450faa",
+                "arn:aws:dynamodb:*:827901512104:table/GithubRunnerQueue",
+                "arn:*:logs:*:*:*"
+            ]
+        },
+        {
+            "Sid": "VisualEditor1",
+            "Effect": "Allow",
+            "Action": [
+                "autoscaling:DescribeAutoScalingGroups"
+            ],
+            "Resource": "*"
+        }
+    ]
+}
diff --git a/lambdas/scale_out_runner/.gitignore b/lambdas/scale_out_runner/.gitignore
new file mode 100644
index 0000000..9e4df26
--- /dev/null
+++ b/lambdas/scale_out_runner/.gitignore
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+.chalice/deployments/
+.chalice/venv/
+__pycache__/
diff --git a/lambdas/scale_out_runner/app.py b/lambdas/scale_out_runner/app.py
new file mode 100644
index 0000000..2f40fb1
--- /dev/null
+++ b/lambdas/scale_out_runner/app.py
@@ -0,0 +1,209 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import codecs
+import hmac
+import json
+import logging
+import os
+from typing import cast
+
+import boto3
+from chalice import BadRequestError, Chalice, ForbiddenError
+from chalice.app import Request
+
+app = Chalice(app_name='scale_out_runner')
+app.log.setLevel(logging.INFO)
+
+ASG_GROUP_NAME = os.getenv('ASG_NAME', 'AshbRunnerASG')
+TABLE_NAME = os.getenv('COUNTER_TABLE', 'GithubRunnerQueue')
+_commiters = set()
+GH_WEBHOOK_TOKEN = None
+
+REPOS = os.getenv('REPOS')
+if REPOS:
+    REPO_CONFIGURATION = json.loads(REPOS)
+else:
+    REPO_CONFIGURATION = {
+        # <repo>: [list-of-branches-to-use-self-hosted-on]
+        'apache/airflow': {'main', 'master'},
+    }
+del REPOS
+
+
+@app.route('/', methods=['POST'])
+def index():
+    validate_gh_sig(app.current_request)
+
+    if app.current_request.headers.get('X-GitHub-Event', None) != "check_run":
+        # Ignore things about installs/permissions etc
+        return {'ignored': 'not about check_runs'}
+
+    body = app.current_request.json_body
+
+    repo = body['repository']['full_name']
+
+    sender = body['sender']['login']
+
+    # Other repos configured with this app, but we don't do anything with them
+    # yet.
+    if repo not in REPO_CONFIGURATION:
+        app.log.info("Ignoring event for %r", repo)
+        return {'ignored': 'Other repo'}
+
+    interested_branches = REPO_CONFIGURATION[repo]
+
+    branch = body['check_run']['check_suite']['head_branch']
+
+    use_self_hosted = sender in commiters() or branch in interested_branches
+    payload = {'sender': sender, 'use_self_hosted': use_self_hosted}
+
+    if body['action'] == 'completed' and body['check_run']['conclusion'] == 'cancelled':
+        if use_self_hosted:
+            # The only time we get a "cancelled" job is when it wasn't yet running.
+            queue_length = increment_dynamodb_counter(-1)
+            # Don't scale in the ASG -- let the CloudWatch alarm do that.
+            payload['new_queue'] = queue_length
+        else:
+            payload = {'ignored': 'unknown sender'}
+
+    elif body['action'] != 'created':
+        payload = {'ignored': "action is not 'created'"}
+
+    elif body['check_run']['status'] != 'queued':
+        # Skipped runs are "created", but are instantly completed. Ignore anything that is not queued
+        payload = {'ignored': "check_run.status is not 'queued'"}
+    else:
+        if use_self_hosted:
+            # Increment counter in DynamoDB
+            queue_length = increment_dynamodb_counter()
+            payload.update(**scale_asg_if_needed(queue_length))
+    app.log.info(
+        "delivery=%s branch=%s: %r",
+        app.current_request.headers.get('X-GitHub-Delivery', None),
+        branch,
+        payload,
+    )
+    return payload
+
+
+def commiters(ssm_repo_name: str = os.getenv('SSM_REPO_NAME', 'apache/airflow')):
+    global _commiters
+
+    if not _commiters:
+        client = boto3.client('ssm')
+        param_path = os.path.join('/runners/', ssm_repo_name, 'configOverlay')
+        app.log.info("Loading config overlay from %s", param_path)
+
+        try:
+
+            resp = client.get_parameter(Name=param_path, WithDecryption=True)
+        except client.exceptions.ParameterNotFound:
+            app.log.debug("Failed to load config overlay", exc_info=True)
+            return set()
+
+        try:
+            overlay = json.loads(resp['Parameter']['Value'])
+        except ValueError:
+            app.log.debug("Failed to parse config overlay", exc_info=True)
+            return set()
+
+        _commiters = set(overlay['pullRequestSecurity']['allowedAuthors'])
+
+    return _commiters
+
+
+def validate_gh_sig(request: Request):
+    sig = request.headers.get('X-Hub-Signature-256', None)
+    if not sig or not sig.startswith('sha256='):
+        raise BadRequestError('X-Hub-Signature-256 not of expected format')
+
+    sig = sig[len('sha256=') :]
+    calculated_sig = sign_request_body(request)
+
+    app.log.debug('Checksum verification - expected %s got %s', calculated_sig, sig)
+
+    if not hmac.compare_digest(sig, calculated_sig):
+        raise ForbiddenError('Spoofed request')
+
+
+def sign_request_body(request: Request) -> str:
+    global GH_WEBHOOK_TOKEN
+    if GH_WEBHOOK_TOKEN is None:
+        if 'GH_WEBHOOK_TOKEN' in os.environ:
+            # Local dev support:
+            GH_WEBHOOK_TOKEN = os.environ['GH_WEBHOOK_TOKEN'].encode('utf-8')
+        else:
+            encrypted = os.environb[b'GH_WEBHOOK_TOKEN_ENCRYPTED']
+
+            kms = boto3.client('kms')
+            response = kms.decrypt(CiphertextBlob=codecs.decode(encrypted, 'base64'))
+            GH_WEBHOOK_TOKEN = response['Plaintext']
+    body = cast(bytes, request.raw_body)
+    return hmac.new(GH_WEBHOOK_TOKEN, body, digestmod='SHA256').hexdigest()  # type: ignore
+
+
+def increment_dynamodb_counter(delta: int = 1) -> int:
+    dynamodb = boto3.client('dynamodb')
+    args = dict(
+        TableName=TABLE_NAME,
+        Key={'id': {'S': 'queued_jobs'}},
+        ExpressionAttributeValues={':delta': {'N': str(delta)}},
+        UpdateExpression='ADD queued :delta',
+        ReturnValues='UPDATED_NEW',
+    )
+
+    if delta < 0:
+        # Make sure it never goes below zero!
+        args['ExpressionAttributeValues'][':limit'] = {'N': str(-delta)}
+        args['ConditionExpression'] = 'queued >= :limit'
+
+    resp = dynamodb.update_item(**args)
+    return int(resp['Attributes']['queued']['N'])
+
+
+def scale_asg_if_needed(num_queued_jobs: int) -> dict:
+    asg = boto3.client('autoscaling')
+
+    resp = asg.describe_auto_scaling_groups(
+        AutoScalingGroupNames=[ASG_GROUP_NAME],
+    )
+
+    asg_info = resp['AutoScalingGroups'][0]
+
+    current = asg_info['DesiredCapacity']
+    max_size = asg_info['MaxSize']
+
+    busy = 0
+    for instance in asg_info['Instances']:
+        if instance['LifecycleState'] == 'InService' and instance['ProtectedFromScaleIn']:
+            busy += 1
+    app.log.info("Busy instances: %d, num_queued_jobs: %d, current_size: %d", busy, num_queued_jobs, current)
+
+    new_size = num_queued_jobs + busy
+    if new_size > current:
+        if new_size <= max_size or current < max_size:
+            try:
+                new_size = min(new_size, max_size)
+                asg.set_desired_capacity(AutoScalingGroupName=ASG_GROUP_NAME, DesiredCapacity=new_size)
+                return {'new_capcity': new_size}
+            except asg.exceptions.ScalingActivityInProgressFault as e:
+                return {'error': str(e)}
+        else:
+            return {'capacity_at_max': True}
+    else:
+        return {'idle_instances': True}
diff --git a/lambdas/scale_out_runner/requirements.txt b/lambdas/scale_out_runner/requirements.txt
new file mode 100644
index 0000000..35f005b
--- /dev/null
+++ b/lambdas/scale_out_runner/requirements.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+boto3
diff --git a/requirements-test.txt b/requirements-test.txt
new file mode 100644
index 0000000..4e2748b
--- /dev/null
+++ b/requirements-test.txt
@@ -0,0 +1,2 @@
+pytest~=6.0
+moto
diff --git a/requirements.txt b/requirements.txt
index 432240d..95528c3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,3 +18,4 @@
 boto3
 click~=7.1
 requests
+pytest~=6.0
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/lambdas/scale_out_runner/conftest.py b/tests/lambdas/scale_out_runner/conftest.py
new file mode 100644
index 0000000..378e130
--- /dev/null
+++ b/tests/lambdas/scale_out_runner/conftest.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+
+import pytest
+from chalice.test import Client
+
+path = os.path.dirname(__file__)
+idx = path.rfind('/tests/')
+path = path[:idx] + path[idx + 6 :]
+sys.path.append(path)
+
+
+@pytest.fixture
+def client(request):
+    app = getattr(request.module, "app")
+
+    with Client(app) as client:
+        yield client
diff --git a/tests/lambdas/scale_out_runner/test_app.py b/tests/lambdas/scale_out_runner/test_app.py
new file mode 100644
index 0000000..52f8e99
--- /dev/null
+++ b/tests/lambdas/scale_out_runner/test_app.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import pytest
+from app import app  # noqa
+
+
+@pytest.fixture(autouse=True)
+def no_requests(monkeypatch):
+    monkeypatch.setenv("GH_WEBHOOK_TOKEN", "abc")
+
+
+def test_no_auth(client):
+    response = client.http.post('/', body=json.dumps({'hello': 'world'}))
+    assert response.status_code == 400
+
+
+@pytest.mark.parametrize(
+    "sig",
+    [
+        "md5=",
+        # Valid, but not prefixed
+        "160156e060356c9444613b224fc5613a0a25315b7898fd5d8c7656bd8a6654af",
+    ],
+)
+def test_bad_auth(sig, client):
+    response = client.http.post(
+        '/',
+        headers={
+            'X-Hub-Signature-256': sig,
+        },
+        body=json.dumps({'hello': 'world'}),
+    )
+    assert response.status_code == 400
+
+
+def test_auth(client):
+    response = client.http.post(
+        '/',
+        headers={
+            'X-Hub-Signature-256': 'sha256=160156e060356c9444613b224fc5613a0a25315b7898fd5d8c7656bd8a6654af'
+        },
+        body=json.dumps({'hello': 'world'}),
+    )
+    assert response.status_code == 200