blob: 8c1d62722a654d730de9c8572ccb0250dcdf04b5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Serve logs process."""
from __future__ import annotations
import collections
import logging
import os
import socket
import gunicorn.app.base
from flask import Flask, abort, request, send_from_directory
from jwt.exceptions import (
ExpiredSignatureError,
ImmatureSignatureError,
InvalidAudienceError,
InvalidIssuedAtError,
InvalidSignatureError,
)
from setproctitle import setproctitle
from airflow.configuration import conf
from airflow.utils.docs import get_docs_url
from airflow.utils.jwt_signer import JWTSigner
logger = logging.getLogger(__name__)
def create_app():
flask_app = Flask(__name__, static_folder=None)
expiration_time_in_seconds = conf.getint("webserver", "log_request_clock_grace", fallback=30)
log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
signer = JWTSigner(
secret_key=conf.get("webserver", "secret_key"),
expiration_time_in_seconds=expiration_time_in_seconds,
audience="task-instance-logs",
)
# Prevent direct access to the logs port
@flask_app.before_request
def validate_pre_signed_url():
try:
auth = request.headers.get("Authorization")
if auth is None:
logger.warning("The Authorization header is missing: %s.", request.headers)
abort(403)
payload = signer.verify_token(auth)
token_filename = payload.get("filename")
request_filename = request.view_args["filename"]
if token_filename is None:
logger.warning("The payload does not contain 'filename' key: %s.", payload)
abort(403)
if token_filename != request_filename:
logger.warning(
"The payload log_relative_path key is different than the one in token:"
"Request path: %s. Token path: %s.",
request_filename,
token_filename,
)
abort(403)
except InvalidAudienceError:
logger.warning("Invalid audience for the request", exc_info=True)
abort(403)
except InvalidSignatureError:
logger.warning("The signature of the request was wrong", exc_info=True)
abort(403)
except ImmatureSignatureError:
logger.warning("The signature of the request was sent from the future", exc_info=True)
abort(403)
except ExpiredSignatureError:
logger.warning(
"The signature of the request has expired. Make sure that all components "
"in your system have synchronized clocks. "
"See more at %s",
get_docs_url("configurations-ref.html#secret-key"),
exc_info=True,
)
abort(403)
except InvalidIssuedAtError:
logger.warning(
"The request was issues in the future. Make sure that all components "
"in your system have synchronized clocks. "
"See more at %s",
get_docs_url("configurations-ref.html#secret-key"),
exc_info=True,
)
abort(403)
except Exception:
logger.warning("Unknown error", exc_info=True)
abort(403)
@flask_app.route("/log/<path:filename>")
def serve_logs_view(filename):
return send_from_directory(log_directory, filename, mimetype="application/json", as_attachment=False)
return flask_app
GunicornOption = collections.namedtuple("GunicornOption", ["key", "value"])
class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
"""
Standalone Gunicorn application/serve for usage with any WSGI-application.
Code inspired by an example from the Gunicorn documentation.
https://github.com/benoitc/gunicorn/blob/cf55d2cec277f220ebd605989ce78ad1bb553c46/examples/standalone_app.py
For details, about standalone gunicorn application, see:
https://docs.gunicorn.org/en/stable/custom.html
"""
def __init__(self, app, options=None):
self.options = options or []
self.application = app
super().__init__()
def load_config(self):
for option in self.options:
self.cfg.set(option.key.lower(), option.value)
def load(self):
return self.application
def serve_logs(port=None):
"""Serves logs generated by Worker."""
setproctitle("airflow serve-logs")
wsgi_app = create_app()
port = port or conf.getint("logging", "WORKER_LOG_SERVER_PORT")
# If dual stack is available and IPV6_V6ONLY is not enabled on the socket
# then when IPV6 is bound to it will also bind to IPV4 automatically
if getattr(socket, "has_dualstack_ipv6", lambda: False)():
bind_option = GunicornOption("bind", f"[::]:{port}")
else:
bind_option = GunicornOption("bind", f"0.0.0.0:{port}")
options = [bind_option, GunicornOption("workers", 2)]
StandaloneGunicornApplication(wsgi_app, options).run()
if __name__ == "__main__":
serve_logs()