blob: bc35fd64fe20ae2bca1f02d9c011f79cd1eff6ab [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import requests
from google.protobuf import json_format
from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
from skywalking.loggings import logger, logger_debug_enabled
class HttpServiceManagementClient(ServiceManagementClient):
def __init__(self):
super().__init__()
self.instance_properties = self.get_instance_properties()
proto = 'https://' if config.agent_force_tls else 'http://'
self.url_instance_props = f"{proto}{config.agent_collector_backend_services.rstrip('/')}/v3/management/reportProperties"
self.url_heart_beat = f"{proto}{config.agent_collector_backend_services.rstrip('/')}/v3/management/keepAlive"
self.session = requests.Session()
def send_instance_props(self):
res = self.session.post(self.url_instance_props, json={
'service': config.agent_name,
'serviceInstance': config.agent_instance_name,
'properties': self.instance_properties,
})
if logger_debug_enabled:
logger.debug('heartbeat response: %s', res)
def send_heart_beat(self):
self.refresh_instance_props()
if logger_debug_enabled:
logger.debug(
'service heart beats, [%s], [%s]',
config.agent_name,
config.agent_instance_name,
)
res = self.session.post(self.url_heart_beat, json={
'service': config.agent_name,
'serviceInstance': config.agent_instance_name,
})
if logger_debug_enabled:
logger.debug('heartbeat response: %s', res)
class HttpTraceSegmentReportService(TraceSegmentReportService):
def __init__(self):
proto = 'https://' if config.agent_force_tls else 'http://'
self.url_report = f"{proto}{config.agent_collector_backend_services.rstrip('/')}/v3/segment"
self.session = requests.Session()
def report(self, generator):
for segment in generator:
res = self.session.post(self.url_report, json={
'traceId': str(segment.related_traces[0]),
'traceSegmentId': str(segment.segment_id),
'service': config.agent_name,
'serviceInstance': config.agent_instance_name,
'spans': [{
'spanId': span.sid,
'parentSpanId': span.pid,
'startTime': span.start_time,
'endTime': span.end_time,
'operationName': span.op,
'peer': span.peer,
'spanType': span.kind.name,
'spanLayer': span.layer.name,
'componentId': span.component.value,
'isError': span.error_occurred,
'logs': [{
'time': int(log.timestamp * 1000),
'data': [{
'key': item.key,
'value': item.val,
} for item in log.items],
} for log in span.logs],
'tags': [{
'key': tag.key,
'value': tag.val,
} for tag in span.iter_tags()],
'refs': [{
'refType': 0,
'traceId': ref.trace_id,
'parentTraceSegmentId': ref.segment_id,
'parentSpanId': ref.span_id,
'parentService': ref.service,
'parentServiceInstance': ref.service_instance,
'parentEndpoint': ref.endpoint,
'networkAddressUsedAtPeer': ref.client_address,
} for ref in span.refs if ref.trace_id]
} for span in segment.spans]
})
if logger_debug_enabled:
logger.debug('report traces response: %s', res)
class HttpLogDataReportService(LogDataReportService):
def __init__(self):
proto = 'https://' if config.agent_force_tls else 'http://'
self.url_report = f"{proto}{config.agent_collector_backend_services.rstrip('/')}/v3/logs"
self.session = requests.Session()
def report(self, generator):
log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator]
if log_batch: # prevent empty batches
res = self.session.post(self.url_report, json=log_batch)
if logger_debug_enabled:
logger.debug('report batch log response: %s', res)