Implement correlation protocol (#35)
* implement correlation
* fix luaunit and add doc and example code
* Adding document and rockspec file
* resolve document issue
* remove empty value
* fix code style
Co-authored-by: Mrproliu <mrproliu@lagou.com>
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 56c3529..75f6969 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -73,6 +73,7 @@
lua span_test.lua
lua tracing_context_test.lua
lua segment_ref_test.lua
+ lua correlation_context_test.lua
cd ..
- name: 'Run Nginx Lua Tests'
run: |
diff --git a/README.md b/README.md
index 87b9ba6..7648668 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,8 @@
-- Currently, we can not have the upstream real network address
------------------------------------------------------
require("tracer"):start("upstream service")
+ -- If you want correlation custom data to the downstream service
+ -- require("tracer"):start("upstream service", {custom = "custom_value"})
}
-- Target upstream service
@@ -118,7 +120,7 @@
## Nginx APIs
- **startTimer**, `require("client"):startBackendTimer("http://127.0.0.1:8080")`. Start the backend timer. This timer register the metadata and report traces to the backend.
-- **start**, `require("tracer"):start("upstream service")`. Begin the tracing before the upstream begin.
+- **start**, `require("tracer"):start("upstream service", correlation)`. Begin the tracing before the upstream begin. The custom data (table type) can be injected as the second parameter, and then they will be propagated to the downstream service.
- **finish**, `require("tracer"):finish()`. Finish the tracing for this HTTP request.
- **prepareForReport**, `require("tracer"):prepareForReport()`. Prepare the finished segment for further report.
diff --git a/examples/nginx.conf b/examples/nginx.conf
index 374f9ff..f403f72 100644
--- a/examples/nginx.conf
+++ b/examples/nginx.conf
@@ -59,6 +59,8 @@
-- Currently, we can not have the upstream real network address
------------------------------------------------------
require("tracer"):start("upstream service")
+ -- If you want correlation custom data to the downstream service
+ -- require("tracer"):start("upstream service", {custom = "custom_value"})
}
proxy_pass http://127.0.0.1:8080/tier2/lb;
diff --git a/lib/skywalking/correlation_context.lua b/lib/skywalking/correlation_context.lua
new file mode 100644
index 0000000..fad7bd6
--- /dev/null
+++ b/lib/skywalking/correlation_context.lua
@@ -0,0 +1,121 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- limit define
+local ELEMENT_MAX_NUMBER = 3
+local VALUE_MAX_LENGTH = 128
+
+local Util = require('util')
+local Base64 = require('dependencies/base64')
+local encode_base64 = Base64.encode
+local decode_base64 = Base64.decode
+
+if Util.is_ngx_lua then
+ encode_base64 = ngx.encode_base64
+ decode_base64 = ngx.decode_base64
+end
+
+local _M = {}
+
+function _M.new()
+ return {}
+end
+
+-- Deserialze value from the correlation context and initalize the context
+function _M.fromSW8Value(value)
+ local context = _M.new()
+
+ if value == nil or #value == 0 then
+ return context
+ end
+
+ local data = Util.split(value, ',')
+ if #data == 0 then
+ return context
+ end
+
+ for i, per_data in ipairs(data)
+ do
+ if #data > ELEMENT_MAX_NUMBER then
+ return context
+ end
+
+ local parts = Util.split(per_data, ':')
+ if #parts == 2 then
+ local key = decode_base64(parts[1])
+ local value = decode_base64(parts[2])
+
+ context[key] = value
+ end
+ end
+
+ return context
+end
+
+-- Return string to represent this correlation context
+function _M.serialize(context)
+ local encoded = ''
+ for name, value in pairs(context) do
+ if #encoded > 0 then
+ encoded = encoded .. ','
+ end
+
+ encoded = encoded .. encode_base64(name) .. ':' .. encode_base64(value)
+ end
+
+ return encoded
+end
+
+-- Put the custom key/value into correlation context.
+function _M.put(context, key, value)
+ -- key must not null
+ if not key then
+ return
+ end
+
+ -- remove and return previous value when value is empty
+ if not value or #value == 0 then
+ context[key] = nil
+ return
+ end
+
+ -- check value length
+ if #value > VALUE_MAX_LENGTH then
+ return
+ end
+
+ -- already contain key, overwrite it
+ if context[key] then
+ context[key] = value
+ return
+ end
+
+
+ -- check keys count
+ local contextLength = 0
+ for k,v in pairs(context) do
+ contextLength = contextLength + 1
+ end
+ if contextLength >= ELEMENT_MAX_NUMBER then
+ return
+ end
+
+ -- setting
+ context[key] = value
+end
+
+return _M
\ No newline at end of file
diff --git a/lib/skywalking/correlation_context_test.lua b/lib/skywalking/correlation_context_test.lua
new file mode 100644
index 0000000..e0dd243
--- /dev/null
+++ b/lib/skywalking/correlation_context_test.lua
@@ -0,0 +1,119 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local lu = require('luaunit')
+local correlationContext = require('correlation_context')
+local TC = require('tracing_context')
+
+TestCorelationContext = {}
+ function TestCorelationContext:testFromSW8Value()
+ -- simple analyze
+ local context = correlationContext.fromSW8Value('dGVzdDE=:dDE=,dGVzdDI=:dDI=')
+ lu.assertNotNil(context)
+ lu.assertEquals(context["test1"], "t1")
+ lu.assertEquals(context["test2"], "t2")
+
+ -- analyze with empty value
+ context = correlationContext.fromSW8Value('dGVzdDE=:')
+ lu.assertNotNil(context)
+ lu.assertNil(context["test1"])
+
+ -- analyze with empty header
+ context = correlationContext.fromSW8Value('')
+ lu.assertNotNil(context)
+ lu.assertNotNil(#context == 0)
+ end
+
+ function TestCorelationContext:testSerialize()
+ -- serialize empty correlation
+ local context = correlationContext.fromSW8Value('')
+ local encode_context = correlationContext.serialize(context)
+ lu.assertNotNil(encode_context)
+ lu.assertEquals(encode_context, "")
+
+ -- serialize with multiple value
+ context = correlationContext.fromSW8Value('')
+ correlationContext.put(context, "test1", "t1")
+ correlationContext.put(context, "test2", "t2")
+ encode_context = correlationContext.serialize(context)
+ lu.assertNotNil(encode_context)
+ context = correlationContext.fromSW8Value(encode_context)
+ lu.assertNotNil(context)
+ lu.assertEquals(context["test1"], "t1")
+ lu.assertEquals(context["test2"], "t2")
+
+ -- serialize with empty value
+ context = correlationContext.fromSW8Value('')
+ correlationContext.put(context, "test1", "")
+ encode_context = correlationContext.serialize(context)
+ lu.assertNotNil(encode_context)
+ lu.assertEquals(encode_context, "")
+ end
+
+ function TestCorelationContext:testPut()
+ -- put with empty key and value
+ local context = correlationContext.fromSW8Value('')
+ correlationContext.put(context, nil, nil)
+ lu.assertEquals(correlationContext.serialize(context), '')
+
+ -- put nil to remove key
+ correlationContext.put(context, "test1", "t1")
+ correlationContext.put(context, "test1", nil)
+ lu.assertEquals(correlationContext.serialize(context), '')
+
+ -- overflow put
+ correlationContext.put(context, "test1", "t1")
+ correlationContext.put(context, "test2", "t2")
+ correlationContext.put(context, "test3", "t3")
+ correlationContext.put(context, "test4", "t4")
+ local encode_context = correlationContext.serialize(context)
+ lu.assertNotNil(encode_context)
+ local context = correlationContext.fromSW8Value(encode_context)
+ lu.assertEquals(context["test1"], "t1")
+ lu.assertEquals(context["test2"], "t2")
+ lu.assertEquals(context["test3"], "t3")
+ end
+
+ function TestCorelationContext:testTracingContext()
+ -- transform data
+ local context = TC.new("service", "instance")
+ local header = {}
+ header["sw8-correlation"] = 'dGVzdDI=:dDI=,dGVzdDE=:dDE=,dGVzdDM=:dDM='
+ TC.createEntrySpan(context, 'operation_name', nil, header)
+ lu.assertNotNil(context.correlation)
+ local contextCarrier = {}
+ TC.createExitSpan(context, 'operation_name', nil, 'peer', contextCarrier)
+ lu.assertNotNil(contextCarrier['sw8-correlation'])
+ local correlation = correlationContext.fromSW8Value(contextCarrier['sw8-correlation'])
+ lu.assertEquals(correlation["test1"], "t1")
+ lu.assertEquals(correlation["test2"], "t2")
+
+ -- transform data with adding data
+ TC.createExitSpan(context, 'operation_name', nil, 'peer', contextCarrier, {
+ test3 = "t3"
+ })
+ lu.assertNotNil(contextCarrier['sw8-correlation'])
+ correlation = correlationContext.fromSW8Value(contextCarrier['sw8-correlation'])
+ lu.assertEquals(correlation["test1"], "t1")
+ lu.assertEquals(correlation["test2"], "t2")
+ lu.assertEquals(correlation["test3"], "t3")
+ end
+
+-- end TestTracingContext
+
+
+os.exit( lu.LuaUnit.run() )
diff --git a/lib/skywalking/tracer.lua b/lib/skywalking/tracer.lua
index 83a0003..af0cdc6 100644
--- a/lib/skywalking/tracer.lua
+++ b/lib/skywalking/tracer.lua
@@ -18,7 +18,7 @@
local Tracer = {}
-function Tracer:start(upstream_name)
+function Tracer:start(upstream_name, correlation)
local metadata_buffer = ngx.shared.tracing_buffer
local TC = require('tracing_context')
local Layer = require('span_layer')
@@ -34,6 +34,7 @@
local contextCarrier = {}
contextCarrier["sw8"] = ngx.req.get_headers()["sw8"]
+ contextCarrier["sw8-correlation"] = ngx.req.get_headers()["sw8-correlation"]
local entrySpan = TC.createEntrySpan(tracingContext, ngx.var.uri, nil, contextCarrier)
Span.start(entrySpan, ngx.now() * 1000)
Span.setComponentId(entrySpan, nginxComponentId)
@@ -49,7 +50,7 @@
local upstreamServerName = upstream_name
------------------------------------------------------
- local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier)
+ local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier, correlation)
Span.start(exitSpan, ngx.now() * 1000)
Span.setComponentId(exitSpan, nginxComponentId)
Span.setLayer(exitSpan, Layer.HTTP)
diff --git a/lib/skywalking/tracing_context.lua b/lib/skywalking/tracing_context.lua
index 3c835bd..c9db5c8 100644
--- a/lib/skywalking/tracing_context.lua
+++ b/lib/skywalking/tracing_context.lua
@@ -17,6 +17,9 @@
local Util = require('util')
local Span = require('span')
+local CorrelationContext = require('correlation_context')
+
+local CONTEXT_CORRELATION_KEY = 'sw8-correlation'
-------------- Internal Object-------------
local Internal = {}
@@ -126,16 +129,32 @@
return Span.newNoOP()
end
+ local correlationData = ''
+ if contextCarrier then
+ correlationData = contextCarrier[CONTEXT_CORRELATION_KEY]
+ end
+ tracingContext.correlation = CorrelationContext.fromSW8Value(correlationData)
+
return Span.createEntrySpan(operationName, tracingContext, parent, contextCarrier)
end
-- Delegate to Span.createExitSpan
-- @param contextCarrier could be nil if don't need to inject any context to propagate
-function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier)
+function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier, correlation)
if tracingContext.is_noop then
return Span.newNoOP()
end
+ if contextCarrier then
+ if correlation then
+ for name, value in pairs(correlation) do
+ CorrelationContext.put(tracingContext.correlation, name, value)
+ end
+ end
+
+ contextCarrier[CONTEXT_CORRELATION_KEY] = CorrelationContext.serialize(tracingContext.correlation)
+ end
+
return Span.createExitSpan(operationName, tracingContext, parent, peer, contextCarrier)
end
diff --git a/skywalking-nginx-lua-2.0-0.rockspec b/skywalking-nginx-lua-2.0-0.rockspec
index b44f819..fb07169 100644
--- a/skywalking-nginx-lua-2.0-0.rockspec
+++ b/skywalking-nginx-lua-2.0-0.rockspec
@@ -26,5 +26,6 @@
["skywalking.span"] = "lib/skywalking/span.lua",
["skywalking.tracing_context"] = "lib/skywalking/tracing_context.lua",
["skywalking.util"] = "lib/skywalking/util.lua",
+ ["skywalking.correlation_context"] = "lib/skywalking/correlation_context.lua",
}
}