Implement correlation protocol (#35)

* implement correlation

* fix luaunit and add doc and example code

* Adding document and rockspec file

* resolve document issue

* remove empty value

* fix code style

Co-authored-by: Mrproliu <mrproliu@lagou.com>
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 56c3529..75f6969 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -73,6 +73,7 @@
           lua span_test.lua
           lua tracing_context_test.lua
           lua segment_ref_test.lua
+          lua correlation_context_test.lua
           cd ..
       - name: 'Run Nginx Lua Tests'
         run: |
diff --git a/README.md b/README.md
index 87b9ba6..7648668 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,8 @@
                 -- Currently, we can not have the upstream real network address
                 ------------------------------------------------------
                 require("tracer"):start("upstream service")
+                -- If you want correlation custom data to the downstream service
+                -- require("tracer"):start("upstream service", {custom = "custom_value"})
             }
 
             -- Target upstream service
@@ -118,7 +120,7 @@
 
 ## Nginx APIs
 - **startTimer**, `require("client"):startBackendTimer("http://127.0.0.1:8080")`. Start the backend timer. This timer register the metadata and report traces to the backend.
-- **start**, `require("tracer"):start("upstream service")`. Begin the tracing before the upstream begin.
+- **start**, `require("tracer"):start("upstream service", correlation)`. Begin the tracing before the upstream begin. The custom data (table type) can be injected as the second parameter, and then they will be propagated to the downstream service.
 - **finish**, `require("tracer"):finish()`. Finish the tracing for this HTTP request.
 - **prepareForReport**, `require("tracer"):prepareForReport()`. Prepare the finished segment for further report.
 
diff --git a/examples/nginx.conf b/examples/nginx.conf
index 374f9ff..f403f72 100644
--- a/examples/nginx.conf
+++ b/examples/nginx.conf
@@ -59,6 +59,8 @@
                 -- Currently, we can not have the upstream real network address
                 ------------------------------------------------------
                 require("tracer"):start("upstream service")
+                -- If you want correlation custom data to the downstream service
+                -- require("tracer"):start("upstream service", {custom = "custom_value"})
             }
 
             proxy_pass http://127.0.0.1:8080/tier2/lb;
diff --git a/lib/skywalking/correlation_context.lua b/lib/skywalking/correlation_context.lua
new file mode 100644
index 0000000..fad7bd6
--- /dev/null
+++ b/lib/skywalking/correlation_context.lua
@@ -0,0 +1,121 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--    http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- limit define
+local ELEMENT_MAX_NUMBER = 3
+local VALUE_MAX_LENGTH = 128
+
+local Util = require('util')
+local Base64 = require('dependencies/base64')
+local encode_base64 = Base64.encode
+local decode_base64 = Base64.decode
+
+if Util.is_ngx_lua then
+    encode_base64 = ngx.encode_base64
+    decode_base64 = ngx.decode_base64
+end
+
+local _M = {}
+
+function _M.new()
+    return {}
+end
+
+-- Deserialze value from the correlation context and initalize the context
+function _M.fromSW8Value(value)
+    local context = _M.new()
+
+    if value == nil or #value == 0 then
+        return context
+    end
+
+    local data = Util.split(value, ',')
+    if #data == 0 then
+        return context
+    end
+
+    for i, per_data in ipairs(data)
+    do
+        if #data > ELEMENT_MAX_NUMBER then
+            return context
+        end
+
+        local parts = Util.split(per_data, ':')
+        if #parts == 2 then
+            local key = decode_base64(parts[1])
+            local value = decode_base64(parts[2])
+
+            context[key] = value
+        end
+    end
+
+    return context
+end
+
+-- Return string to represent this correlation context
+function _M.serialize(context)
+    local encoded = ''
+    for name, value in pairs(context) do
+        if #encoded > 0 then
+            encoded = encoded .. ','
+        end
+
+        encoded = encoded .. encode_base64(name) .. ':' .. encode_base64(value)
+    end
+
+    return encoded
+end
+
+-- Put the custom key/value into correlation context.
+function _M.put(context, key, value)
+    -- key must not null
+    if not key then
+        return
+    end
+
+    -- remove and return previous value when value is empty
+    if not value or #value == 0 then
+        context[key] = nil
+        return
+    end
+
+    -- check value length
+    if #value > VALUE_MAX_LENGTH then
+        return
+    end
+
+    -- already contain key, overwrite it
+    if context[key] then
+        context[key] = value
+        return
+    end
+
+
+    -- check keys count
+    local contextLength = 0
+    for k,v in pairs(context) do
+        contextLength = contextLength + 1
+    end
+    if contextLength >= ELEMENT_MAX_NUMBER then
+        return
+    end
+
+    -- setting
+    context[key] = value
+end
+
+return _M
\ No newline at end of file
diff --git a/lib/skywalking/correlation_context_test.lua b/lib/skywalking/correlation_context_test.lua
new file mode 100644
index 0000000..e0dd243
--- /dev/null
+++ b/lib/skywalking/correlation_context_test.lua
@@ -0,0 +1,119 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--    http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local lu = require('luaunit')
+local correlationContext = require('correlation_context')
+local TC = require('tracing_context')
+
+TestCorelationContext = {}
+    function TestCorelationContext:testFromSW8Value()
+        -- simple analyze
+        local context = correlationContext.fromSW8Value('dGVzdDE=:dDE=,dGVzdDI=:dDI=')
+        lu.assertNotNil(context)
+        lu.assertEquals(context["test1"], "t1")
+        lu.assertEquals(context["test2"], "t2")
+
+        -- analyze with empty value
+        context = correlationContext.fromSW8Value('dGVzdDE=:')
+        lu.assertNotNil(context)
+        lu.assertNil(context["test1"])
+
+        -- analyze with empty header
+        context = correlationContext.fromSW8Value('')
+        lu.assertNotNil(context)
+        lu.assertNotNil(#context == 0)
+    end
+
+    function TestCorelationContext:testSerialize()
+        -- serialize empty correlation
+        local context = correlationContext.fromSW8Value('')
+        local encode_context = correlationContext.serialize(context)
+        lu.assertNotNil(encode_context)
+        lu.assertEquals(encode_context, "")
+
+        -- serialize with multiple value
+        context = correlationContext.fromSW8Value('')
+        correlationContext.put(context, "test1", "t1")
+        correlationContext.put(context, "test2", "t2")
+        encode_context = correlationContext.serialize(context)
+        lu.assertNotNil(encode_context)
+        context = correlationContext.fromSW8Value(encode_context)
+        lu.assertNotNil(context)
+        lu.assertEquals(context["test1"], "t1")
+        lu.assertEquals(context["test2"], "t2")
+
+        -- serialize with empty value
+        context = correlationContext.fromSW8Value('')
+        correlationContext.put(context, "test1", "")
+        encode_context = correlationContext.serialize(context)
+        lu.assertNotNil(encode_context)
+        lu.assertEquals(encode_context, "")
+    end
+
+    function TestCorelationContext:testPut()
+        -- put with empty key and value
+        local context = correlationContext.fromSW8Value('')
+        correlationContext.put(context, nil, nil)
+        lu.assertEquals(correlationContext.serialize(context), '')
+
+        -- put nil to remove key
+        correlationContext.put(context, "test1", "t1")
+        correlationContext.put(context, "test1", nil)
+        lu.assertEquals(correlationContext.serialize(context), '')
+
+        -- overflow put
+        correlationContext.put(context, "test1", "t1")
+        correlationContext.put(context, "test2", "t2")
+        correlationContext.put(context, "test3", "t3")
+        correlationContext.put(context, "test4", "t4")
+        local encode_context = correlationContext.serialize(context)
+        lu.assertNotNil(encode_context)
+        local context = correlationContext.fromSW8Value(encode_context)
+        lu.assertEquals(context["test1"], "t1")
+        lu.assertEquals(context["test2"], "t2")
+        lu.assertEquals(context["test3"], "t3")
+    end
+
+    function TestCorelationContext:testTracingContext()
+        -- transform data
+        local context = TC.new("service", "instance")
+        local header = {}
+        header["sw8-correlation"] = 'dGVzdDI=:dDI=,dGVzdDE=:dDE=,dGVzdDM=:dDM='
+        TC.createEntrySpan(context, 'operation_name', nil, header)
+        lu.assertNotNil(context.correlation)
+        local contextCarrier = {}
+        TC.createExitSpan(context, 'operation_name', nil, 'peer', contextCarrier)
+        lu.assertNotNil(contextCarrier['sw8-correlation'])
+        local correlation = correlationContext.fromSW8Value(contextCarrier['sw8-correlation'])
+        lu.assertEquals(correlation["test1"], "t1")
+        lu.assertEquals(correlation["test2"], "t2")
+
+        -- transform data with adding data
+        TC.createExitSpan(context, 'operation_name', nil, 'peer', contextCarrier, {
+            test3 = "t3"
+        })
+        lu.assertNotNil(contextCarrier['sw8-correlation'])
+        correlation = correlationContext.fromSW8Value(contextCarrier['sw8-correlation'])
+        lu.assertEquals(correlation["test1"], "t1")
+        lu.assertEquals(correlation["test2"], "t2")
+        lu.assertEquals(correlation["test3"], "t3")
+    end
+
+-- end TestTracingContext
+
+
+os.exit( lu.LuaUnit.run() )
diff --git a/lib/skywalking/tracer.lua b/lib/skywalking/tracer.lua
index 83a0003..af0cdc6 100644
--- a/lib/skywalking/tracer.lua
+++ b/lib/skywalking/tracer.lua
@@ -18,7 +18,7 @@
 
 local Tracer = {}
 
-function Tracer:start(upstream_name)
+function Tracer:start(upstream_name, correlation)
     local metadata_buffer = ngx.shared.tracing_buffer
     local TC = require('tracing_context')
     local Layer = require('span_layer')
@@ -34,6 +34,7 @@
 
     local contextCarrier = {}
     contextCarrier["sw8"] = ngx.req.get_headers()["sw8"]
+    contextCarrier["sw8-correlation"] = ngx.req.get_headers()["sw8-correlation"]
     local entrySpan = TC.createEntrySpan(tracingContext, ngx.var.uri, nil, contextCarrier)
     Span.start(entrySpan, ngx.now() * 1000)
     Span.setComponentId(entrySpan, nginxComponentId)
@@ -49,7 +50,7 @@
 
     local upstreamServerName = upstream_name
     ------------------------------------------------------
-    local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier)
+    local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier, correlation)
     Span.start(exitSpan, ngx.now() * 1000)
     Span.setComponentId(exitSpan, nginxComponentId)
     Span.setLayer(exitSpan, Layer.HTTP)
diff --git a/lib/skywalking/tracing_context.lua b/lib/skywalking/tracing_context.lua
index 3c835bd..c9db5c8 100644
--- a/lib/skywalking/tracing_context.lua
+++ b/lib/skywalking/tracing_context.lua
@@ -17,6 +17,9 @@
 
 local Util = require('util')
 local Span = require('span')
+local CorrelationContext = require('correlation_context')
+
+local CONTEXT_CORRELATION_KEY = 'sw8-correlation'
 
 -------------- Internal Object-------------
 local Internal = {}
@@ -126,16 +129,32 @@
         return Span.newNoOP()
     end
 
+    local correlationData = ''
+    if contextCarrier then
+        correlationData = contextCarrier[CONTEXT_CORRELATION_KEY]
+    end
+    tracingContext.correlation = CorrelationContext.fromSW8Value(correlationData)
+
     return Span.createEntrySpan(operationName, tracingContext, parent, contextCarrier)
 end
 
 -- Delegate to Span.createExitSpan
 -- @param contextCarrier could be nil if don't need to inject any context to propagate
-function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier)
+function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier, correlation)
     if tracingContext.is_noop then
         return Span.newNoOP()
     end
 
+    if contextCarrier then
+        if correlation then
+            for name, value in pairs(correlation) do
+                CorrelationContext.put(tracingContext.correlation, name, value)
+            end
+        end
+
+        contextCarrier[CONTEXT_CORRELATION_KEY] = CorrelationContext.serialize(tracingContext.correlation)
+    end
+
     return Span.createExitSpan(operationName, tracingContext, parent, peer, contextCarrier)
 end
 
diff --git a/skywalking-nginx-lua-2.0-0.rockspec b/skywalking-nginx-lua-2.0-0.rockspec
index b44f819..fb07169 100644
--- a/skywalking-nginx-lua-2.0-0.rockspec
+++ b/skywalking-nginx-lua-2.0-0.rockspec
@@ -26,5 +26,6 @@
     ["skywalking.span"] = "lib/skywalking/span.lua",
     ["skywalking.tracing_context"] = "lib/skywalking/tracing_context.lua",
     ["skywalking.util"] = "lib/skywalking/util.lua",
+    ["skywalking.correlation_context"] = "lib/skywalking/correlation_context.lua",
    }
 }