Feature: adapt HTTP protocol (#8)
diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index a3f1fb4..69d871c 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -40,10 +40,10 @@
with:
python-version: ${{ matrix.python-version }}
- name: Set up dependencies
- run: make setup
+ run: make setup install
- name: Lint codes
run: make lint
- name: Check license header
run: make license
- name: Run unit tests
- run: make test
\ No newline at end of file
+ run: make test
diff --git a/README.md b/README.md
index c45e0c1..b23b809 100644
--- a/README.md
+++ b/README.md
@@ -33,7 +33,7 @@
| `SW_AGENT_NAME` | The name of the Python service | `Python Service Name` |
| `SW_AGENT_INSTANCE` | The name of the Python service instance | Randomly generated |
| `SW_AGENT_COLLECTOR_BACKEND_SERVICES` | The backend OAP server address | `127.0.0.1:11800` |
-| `SW_AGENT_PROTOCOL` | The protocol to communicate with the backend OAP, currently only `grpc` is supported | `grpc` |
+| `SW_AGENT_PROTOCOL` | The protocol to communicate with the backend OAP, `http` or `grpc`, **we highly suggest using `grpc` in production as it's well optimized than `http`** | `grpc` |
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
| `SW_AGENT_DISABLE_PLUGINS` | The name patterns in CSV pattern, plugins whose name matches one of the pattern won't be installed | `''` |
diff --git a/setup.py b/setup.py
index 958886d..b5102f1 100644
--- a/setup.py
+++ b/setup.py
@@ -37,5 +37,6 @@
include_package_data=True,
install_requires=[
"grpcio",
+ "requests",
],
)
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 2271665..adf5297 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -55,12 +55,13 @@
def __init():
+ global __protocol
if config.protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
- global __protocol
__protocol = GrpcProtocol()
elif config.protocol == 'http':
- raise NotImplementedError() # TODO
+ from skywalking.agent.protocol.http import HttpProtocol
+ __protocol = HttpProtocol()
plugins.install()
diff --git a/skywalking/agent/protocol/grpc/__init__.py b/skywalking/agent/protocol/grpc/__init__.py
index 5570aa0..8a55132 100644
--- a/skywalking/agent/protocol/grpc/__init__.py
+++ b/skywalking/agent/protocol/grpc/__init__.py
@@ -24,7 +24,6 @@
from language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
from skywalking import config
from skywalking.agent import Protocol
-from skywalking.agent.protocol.grpc import interceptors
from skywalking.agent.protocol.grpc.interceptors import header_adder_interceptor
from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService
from skywalking.trace.segment import Segment
diff --git a/skywalking/agent/protocol/http/__init__.py b/skywalking/agent/protocol/http/__init__.py
index 0254960..e430f13 100644
--- a/skywalking/agent/protocol/http/__init__.py
+++ b/skywalking/agent/protocol/http/__init__.py
@@ -14,17 +14,41 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+import logging
from queue import Queue
from skywalking.agent import Protocol
+from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService
+from skywalking.trace.segment import Segment
+
+logger = logging.getLogger(__name__)
-class HttpAgent(Protocol):
+class HttpProtocol(Protocol):
+ def __init__(self):
+ self.properties_sent = False
+ self.service_management = HttpServiceManagementClient()
+ self.traces_reporter = HttpTraceSegmentReportService()
+
+ def heartbeat(self):
+ if not self.properties_sent:
+ self.service_management.send_instance_props()
+ self.properties_sent = True
+ self.service_management.send_heart_beat()
+
def connected(self):
return True
- def heartbeat(self):
- pass
-
def report(self, queue: Queue):
- pass
+ def generator():
+ while True:
+ segment = queue.get() # type: Segment
+
+ logger.debug('reporting segment %s', segment)
+
+ yield segment
+
+ queue.task_done()
+
+ self.traces_reporter.report(generator=generator())
diff --git a/skywalking/client/grpc/__init__.py b/skywalking/client/grpc/__init__.py
index cb69974..7849b14 100644
--- a/skywalking/client/grpc/__init__.py
+++ b/skywalking/client/grpc/__init__.py
@@ -18,6 +18,7 @@
import logging
import grpc
+from common.Common_pb2 import KeyStringValuePair
from language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
from management.Management_pb2 import InstancePingPkg, InstanceProperties
from management.Management_pb2_grpc import ManagementServiceStub
@@ -36,7 +37,7 @@
self.service_stub.reportInstanceProperties(InstanceProperties(
service=config.service_name,
serviceInstance=config.service_instance,
- properties=[],
+ properties=[KeyStringValuePair(key='language', value='Python')],
))
def send_heart_beat(self):
diff --git a/skywalking/client/http/__init__.py b/skywalking/client/http/__init__.py
index b1312a0..3252c7a 100644
--- a/skywalking/client/http/__init__.py
+++ b/skywalking/client/http/__init__.py
@@ -14,3 +14,90 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+import logging
+
+import requests
+
+from skywalking import config
+from skywalking.client import ServiceManagementClient, TraceSegmentReportService
+
+logger = logging.getLogger(__name__)
+
+
+class HttpServiceManagementClient(ServiceManagementClient):
+ def __init__(self):
+ self.session = requests.session()
+
+ def send_instance_props(self):
+ url = config.collector_address.rstrip('/') + '/v3/management/reportProperties'
+ res = self.session.post(url, json={
+ 'service': config.service_name,
+ 'serviceInstance': config.service_instance,
+ 'properties': [{
+ 'language': 'Python',
+ }]
+ })
+ logger.debug('heartbeat response: %s', res)
+
+ def send_heart_beat(self):
+ logger.debug(
+ 'service heart beats, [%s], [%s]',
+ config.service_name,
+ config.service_instance,
+ )
+ url = config.collector_address.rstrip('/') + '/v3/management/keepAlive'
+ res = self.session.post(url, json={
+ 'service': config.service_name,
+ 'serviceInstance': config.service_instance,
+ })
+ logger.debug('heartbeat response: %s', res)
+
+
+class HttpTraceSegmentReportService(TraceSegmentReportService):
+ def __init__(self):
+ self.session = requests.session()
+
+ def report(self, generator):
+ url = config.collector_address.rstrip('/') + '/v3/segment'
+ for segment in generator:
+ res = self.session.post(url, json={
+ 'traceId': str(segment.related_traces[0]),
+ 'traceSegmentId': str(segment.segment_id),
+ 'service': config.service_name,
+ 'serviceInstance': config.service_instance,
+ 'spans': [{
+ 'spanId': span.sid,
+ 'parentSpanId': span.pid,
+ 'startTime': span.start_time,
+ 'endTime': span.end_time,
+ 'operationName': span.op,
+ 'peer': span.peer,
+ 'spanType': span.kind.name,
+ 'spanLayer': span.layer.name,
+ 'componentId': span.component.value,
+ 'isError': span.error_occurred,
+ 'logs': [{
+ 'time': log.timestamp * 1000,
+ 'data': [{
+ 'key': item.key,
+ 'value': item.val
+ } for item in log.items],
+ } for log in span.logs],
+ 'tags': [{
+ 'key': tag.key,
+ 'value': tag.val,
+ } for tag in span.tags],
+ 'refs': [{
+ 'refType': 0,
+ 'traceId': ref.trace_id,
+ 'parentTraceSegmentId': ref.segment_id,
+ 'parentSpanId': ref.span_id,
+ 'parentService': ref.service,
+ 'parentServiceInstance': ref.service_instance,
+ 'parentEndpoint': ref.endpoint,
+ 'networkAddressUsedAtPeer': ref.client_address,
+ } for ref in span.refs if ref.trace_id]
+ } for span in segment.spans]
+ })
+ logger.debug('report traces response: %s', res)
diff --git a/skywalking/config/__init__.py b/skywalking/config/__init__.py
index ffd3fd0..341486e 100644
--- a/skywalking/config/__init__.py
+++ b/skywalking/config/__init__.py
@@ -22,7 +22,7 @@
service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str
service_instance = os.getenv('SW_AGENT_INSTANCE') or str(uuid.uuid1()).replace('-', '') # type: str
collector_address = os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES') or '127.0.0.1:11800' # type: str
-protocol = os.getenv('SW_AGENT_PROTOCOL') or 'grpc' # type: str
+protocol = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower() # type: str
authentication = os.getenv('SW_AGENT_AUTHENTICATION') # type: str
logging_level = os.getenv('SW_AGENT_LOGGING_LEVEL') or 'INFO' # type: str
disable_plugins = (os.getenv('SW_AGENT_DISABLE_PLUGINS') or '').split(',') # type: List[str]