blob: 599b2eb7276cc8873933a2c6737df6eda908f351 [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pf
import (
"context"
"encoding/json"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
// FunctionContext provides contextual information to the executing function.
// Features like which message id we are handling, whats the topic name of the
// message, what are our operating constraints, etc can be accessed by the
// executing function
type FunctionContext struct {
instanceConf *instanceConf
userConfigs map[string]interface{}
logAppender *LogAppender
outputMessage func(topic string) pulsar.Producer
record pulsar.Message
}
// NewFuncContext returns a new Function context
func NewFuncContext() *FunctionContext {
instanceConf := newInstanceConf()
userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig())
fc := &FunctionContext{
instanceConf: instanceConf,
userConfigs: userConfigs,
}
return fc
}
//GetInstanceID returns the id of the instance that invokes the running pulsar
//function.
func (c *FunctionContext) GetInstanceID() int {
return c.instanceConf.instanceID
}
//GetInputTopics returns a list of all input topics the pulsar function has been
//invoked on
func (c *FunctionContext) GetInputTopics() []string {
inputMap := c.instanceConf.funcDetails.GetSource().InputSpecs
inputTopics := make([]string, len(inputMap))
i := 0
for k := range inputMap {
inputTopics[i] = k
i++
}
return inputTopics
}
//GetOutputTopic returns the output topic the pulsar function was invoked on
func (c *FunctionContext) GetOutputTopic() string {
return c.instanceConf.funcDetails.GetSink().Topic
}
//GetTenantAndNamespace returns the tenant and namespace the pulsar function
//belongs to in the format of `<tenant>/<namespace>`
func (c *FunctionContext) GetTenantAndNamespace() string {
return c.GetFuncTenant() + "/" + c.GetFuncNamespace()
}
//GetTenantAndNamespaceAndName returns the full name of the pulsar function in
//the format of `<tenant>/<namespace>/<function name>`
func (c *FunctionContext) GetTenantAndNamespaceAndName() string {
return c.GetFuncTenant() + "/" + c.GetFuncNamespace() + "/" + c.GetFuncName()
}
//GetFuncTenant returns the tenant the pulsar function belongs to
func (c *FunctionContext) GetFuncTenant() string {
return c.instanceConf.funcDetails.Tenant
}
//GetFuncName returns the name given to the pulsar function
func (c *FunctionContext) GetFuncName() string {
return c.instanceConf.funcDetails.Name
}
//GetFuncNamespace returns the namespace the pulsar function belongs to
func (c *FunctionContext) GetFuncNamespace() string {
return c.instanceConf.funcDetails.Namespace
}
//GetFuncID returns the id of the pulsar function
func (c *FunctionContext) GetFuncID() string {
return c.instanceConf.funcID
}
//GetPort returns the port the pulsar function communicates on
func (c *FunctionContext) GetPort() int {
return c.instanceConf.port
}
//GetClusterName returns the name of the cluster the pulsar function is running
//in
func (c *FunctionContext) GetClusterName() string {
return c.instanceConf.clusterName
}
//GetExpectedHealthCheckInterval returns the expected time between health checks
//in seconds
func (c *FunctionContext) GetExpectedHealthCheckInterval() int32 {
return c.instanceConf.expectedHealthCheckInterval
}
//GetExpectedHealthCheckIntervalAsDuration returns the expected time between
//health checks in seconds as a time.Duration
func (c *FunctionContext) GetExpectedHealthCheckIntervalAsDuration() time.Duration {
return time.Duration(c.instanceConf.expectedHealthCheckInterval)
}
//GetMaxIdleTime returns the amount of time the pulsar function has to respond
//to the most recent health check before it is considered to be failing.
func (c *FunctionContext) GetMaxIdleTime() int64 {
return int64(c.GetExpectedHealthCheckIntervalAsDuration() * 3 * time.Second)
}
//GetFuncVersion returns the version of the pulsar function
func (c *FunctionContext) GetFuncVersion() string {
return c.instanceConf.funcVersion
}
//GetUserConfValue returns the value of a key from the pulsar function's user
//configuration map
func (c *FunctionContext) GetUserConfValue(key string) interface{} {
return c.userConfigs[key]
}
//GetUserConfMap returns the pulsar function's user configuration map
func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
return c.userConfigs
}
// NewOutputMessage send message to the topic @param topicName: The name of the
// topic for output message
func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {
return c.outputMessage(topicName)
}
// SetCurrentRecord sets the current message into the function context called
// for each message before executing a handler function
func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
c.record = record
}
// GetCurrentRecord gets the current message from the function context
func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
return c.record
}
//GetMetricsPort returns the port the pulsar function metrics listen on
func (c *FunctionContext) GetMetricsPort() int {
return c.instanceConf.metricsPort
}
// An unexported type to be used as the key for types in this package. This
// prevents collisions with keys defined in other packages.
type key struct{}
// contextKey is the key for FunctionContext values in context.Context. It is
// unexported; clients should use FunctionContext.NewContext and
// FunctionContext.FromContext instead of using this key directly.
var contextKey = &key{}
// NewContext returns a new Context that carries value u.
func NewContext(parent context.Context, fc *FunctionContext) context.Context {
return context.WithValue(parent, contextKey, fc)
}
// FromContext returns the User value stored in ctx, if any.
func FromContext(ctx context.Context) (*FunctionContext, bool) {
fc, ok := ctx.Value(contextKey).(*FunctionContext)
return fc, ok
}
func buildUserConfig(data string) map[string]interface{} {
m := make(map[string]interface{})
json.Unmarshal([]byte(data), &m)
return m
}