[Feature] Process propagation (#67)
Co-authored-by: kezhenxu94 <kezhenxu94@163.com>
diff --git a/README.md b/README.md
index 649b3c4..1900fb1 100755
--- a/README.md
+++ b/README.md
@@ -88,6 +88,7 @@
from skywalking import Component
from skywalking.decorators import trace, runnable
from skywalking.trace.context import SpanContext, get_context
+from skywalking.trace.ipc.process import SwProcess
@trace() # the operation name is the method name('some_other_method') by default
def some_other_method():
@@ -103,10 +104,16 @@
def some_method():
some_other_method()
-from threading import Thread
+from threading import Thread
t = Thread(target=some_method)
t.start()
+# When another process is started, agents will also be started in other processes,
+# supporting only the process mode of spawn.
+p1 = SwProcess(target=some_method)
+p1.start()
+p1.join()
+
context: SpanContext = get_context()
with context.new_entry_span(op=str('https://github.com/apache/skywalking')) as span:
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index fe181a5..8ed9759 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -82,6 +82,10 @@
__finished.set()
+def started():
+ return __started
+
+
def connected():
return __protocol.connected()
diff --git a/skywalking/config.py b/skywalking/config.py
index b2def35..9ea2478 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -14,10 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
+import inspect
import os
import uuid
-from typing import List
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from typing import List
service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str
service_instance = os.getenv('SW_AGENT_INSTANCE') or str(uuid.uuid1()).replace('-', '') # type: str
@@ -35,17 +38,17 @@
ignore_suffix = os.getenv('SW_IGNORE_SUFFIX') or '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,' \
'.mp4,.html,.svg ' # type: str
flask_collect_http_params = True if os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') and \
- os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
+ os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
http_params_length_threshold = int(os.getenv('SW_HTTP_PARAMS_LENGTH_THRESHOLD') or '1024') # type: int
django_collect_http_params = True if os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') and \
- os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
+ os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
correlation_element_max_number = int(os.getenv('SW_CORRELATION_ELEMENT_MAX_NUMBER') or '3') # type: int
correlation_value_max_length = int(os.getenv('SW_CORRELATION_VALUE_MAX_LENGTH') or '128') # type: int
trace_ignore = True if os.getenv('SW_TRACE_IGNORE') and \
- os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool
+ os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool
trace_ignore_path = (os.getenv('SW_TRACE_IGNORE_PATH') or '').split(',') # type: List[str]
elasticsearch_trace_dsl = True if os.getenv('SW_ELASTICSEARCH_TRACE_DSL') and \
- os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool
+ os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool
def init(
@@ -69,3 +72,23 @@
global authentication
authentication = token or authentication
+
+
+def serialize():
+ from skywalking import config
+ return {
+ key: value for key, value in config.__dict__.items() if not (
+ key.startswith('_') or key == 'TYPE_CHECKING'
+ or inspect.isfunction(value)
+ or inspect.ismodule(value)
+ or inspect.isbuiltin(value)
+ or inspect.isclass(value)
+ )
+ }
+
+
+def deserialize(data):
+ from skywalking import config
+ for key, value in data.items():
+ if key in config.__dict__:
+ config.__dict__[key] = value
diff --git a/skywalking/trace/ipc/__init__.py b/skywalking/trace/ipc/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/skywalking/trace/ipc/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/skywalking/trace/ipc/process.py b/skywalking/trace/ipc/process.py
new file mode 100644
index 0000000..6799e0e
--- /dev/null
+++ b/skywalking/trace/ipc/process.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from multiprocessing import Process
+
+from skywalking import config, agent
+
+
+class SwProcess(Process):
+
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *,
+ daemon=None):
+ super(SwProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)
+ self._sw_config = config.serialize()
+
+ def run(self):
+ if agent.started() is False:
+ config.deserialize(self._sw_config)
+ agent.start()
+ super(SwProcess, self).run()
diff --git a/tests/plugin/docker/Dockerfile.tool b/tests/plugin/docker/Dockerfile.tool
index f4283f4..e88d0ca 100644
--- a/tests/plugin/docker/Dockerfile.tool
+++ b/tests/plugin/docker/Dockerfile.tool
@@ -19,7 +19,7 @@
WORKDIR /tests
-ARG COMMIT_HASH=3c9d7099f05dc4a4b937c8a47506e56c130b6221
+ARG COMMIT_HASH=8a48c49b4420df5c9576d2aea178b2ebcb7ecd09
ADD https://github.com/apache/skywalking-agent-test-tool/archive/${COMMIT_HASH}.tar.gz .
diff --git a/tests/plugin/sw_process/__init__.py b/tests/plugin/sw_process/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_process/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_process/docker-compose.yml b/tests/plugin/sw_process/docker-compose.yml
new file mode 100644
index 0000000..61f3be7
--- /dev/null
+++ b/tests/plugin/sw_process/docker-compose.yml
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+ collector:
+ extends:
+ service: collector
+ file: ../docker/docker-compose.base.yml
+
+ provider:
+ extends:
+ service: agent
+ file: ../docker/docker-compose.base.yml
+ ports:
+ - 9091:9091
+ volumes:
+ - ./services/provider.py:/app/provider.py
+ command: ['bash', '-c', 'pip install flask && python3 /app/provider.py']
+ depends_on:
+ collector:
+ condition: service_healthy
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ consumer:
+ extends:
+ service: agent
+ file: ../docker/docker-compose.base.yml
+ ports:
+ - 9090:9090
+ volumes:
+ - ./services/consumer.py:/app/consumer.py
+ command: ['bash', '-c', 'pip install flask && python3 /app/consumer.py']
+ depends_on:
+ collector:
+ condition: service_healthy
+ provider:
+ condition: service_healthy
+
+networks:
+ beyond:
diff --git a/tests/plugin/sw_process/expected.data.yml b/tests/plugin/sw_process/expected.data.yml
new file mode 100644
index 0000000..c295a84
--- /dev/null
+++ b/tests/plugin/sw_process/expected.data.yml
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+ - serviceName: provider
+ segmentSize: 2
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ tags:
+ - key: http.method
+ value: POST
+ - key: url
+ value: http://provider:9091/users
+ - key: status.code
+ value: '200'
+ refs:
+ - parentEndpoint: /users
+ networkAddress: 'provider:9091'
+ refType: CrossProcess
+ parentSpanId: 0
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: consumer
+ traceId: not null
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ - segmentId: not null
+ spans:
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ tags:
+ - key: http.method
+ value: POST
+ - key: url
+ value: http://provider:9091/users
+ - key: status.code
+ value: '200'
+ refs:
+ - parentEndpoint: /users
+ networkAddress: 'provider:9091'
+ refType: CrossProcess
+ parentSpanId: 1
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: consumer
+ traceId: not null
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ - serviceName: consumer
+ segmentSize: 2
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7002
+ isError: false
+ spanType: Exit
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - key: http.method
+ value: POST
+ - key: url
+ value: 'http://provider:9091/users'
+ - key: status.code
+ value: '200'
+ - segmentId: not null
+ spans:
+ - operationName: /users
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7002
+ isError: false
+ spanType: Exit
+ peer: provider:9091
+ skipAnalysis: false
+ tags:
+ - key: http.method
+ value: POST
+ - key: url
+ value: 'http://provider:9091/users'
+ - key: status.code
+ value: '200'
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ isError: false
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - key: http.method
+ value: GET
+ - key: url
+ value: 'http://0.0.0.0:9090/users'
+ - key: status.code
+ value: '200'
\ No newline at end of file
diff --git a/tests/plugin/sw_process/services/__init__.py b/tests/plugin/sw_process/services/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_process/services/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_process/services/consumer.py b/tests/plugin/sw_process/services/consumer.py
new file mode 100644
index 0000000..d7d3fe6
--- /dev/null
+++ b/tests/plugin/sw_process/services/consumer.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+import requests
+from skywalking import agent, config
+from skywalking.trace.ipc.process import SwProcess
+import multiprocessing
+
+
+def post():
+ requests.post("http://provider:9091/users")
+ time.sleep(3)
+
+
+if __name__ == '__main__':
+ multiprocessing.set_start_method('spawn')
+ config.service_name = 'consumer'
+ config.logging_level = 'DEBUG'
+ config.flask_collect_http_params = True
+ agent.start()
+
+ from flask import Flask, jsonify
+
+ app = Flask(__name__)
+
+ @app.route("/users", methods=["POST", "GET"])
+ def application():
+ p1 = SwProcess(target=post)
+ p1.start()
+ p1.join()
+
+ res = requests.post("http://provider:9091/users")
+
+ return jsonify(res.json())
+
+ PORT = 9090
+ app.run(host='0.0.0.0', port=PORT, debug=False)
diff --git a/tests/plugin/sw_process/services/provider.py b/tests/plugin/sw_process/services/provider.py
new file mode 100644
index 0000000..d043141
--- /dev/null
+++ b/tests/plugin/sw_process/services/provider.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+
+from skywalking import agent, config
+
+if __name__ == '__main__':
+ config.service_name = 'provider'
+ config.logging_level = 'DEBUG'
+ agent.start()
+
+ from flask import Flask, jsonify
+
+ app = Flask(__name__)
+
+ @app.route("/users", methods=["POST", "GET"])
+ def application():
+ time.sleep(0.5)
+ return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
+
+ PORT = 9091
+ app.run(host='0.0.0.0', port=PORT, debug=False)
diff --git a/tests/plugin/sw_process/test_process.py b/tests/plugin/sw_process/test_process.py
new file mode 100644
index 0000000..bfafb7a
--- /dev/null
+++ b/tests/plugin/sw_process/test_process.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Callable
+import pytest
+import requests
+from tests.plugin.base import TestPluginBase
+
+
+@pytest.fixture
+def prepare():
+ # type: () -> Callable
+ return lambda *_: requests.get('http://0.0.0.0:9090/users')
+
+
+class TestPlugin(TestPluginBase):
+ def test_plugin(self, docker_compose, version):
+ self.validate()