blob: c0d126267e0a8a32152680d29fa36628a09d2ff2 [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package conf
import (
"encoding/json"
"flag"
"io/ioutil"
"os"
"time"
log "github.com/apache/pulsar/pulsar-function-go/logutil"
"gopkg.in/yaml.v2"
)
const ConfigPath = "conf/conf.yaml"
type Conf struct {
PulsarServiceURL string `json:"pulsarServiceURL" yaml:"pulsarServiceURL"`
InstanceID int `json:"instanceID" yaml:"instanceID"`
FuncID string `json:"funcID" yaml:"funcID"`
FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
Port int `json:"port" yaml:"port"`
ClusterName string `json:"clusterName" yaml:"clusterName"`
KillAfterIdleMs time.Duration `json:"killAfterIdleMs" yaml:"killAfterIdleMs"`
// function details config
Tenant string `json:"tenant" yaml:"tenant"`
NameSpace string `json:"nameSpace" yaml:"nameSpace"`
Name string `json:"name" yaml:"name"`
LogTopic string `json:"logTopic" yaml:"logTopic"`
ProcessingGuarantees int32 `json:"processingGuarantees" yaml:"processingGuarantees"`
SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
Runtime int32 `json:"runtime" yaml:"runtime"`
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
//source input specs
SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
SourceSchemaType string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
IsRegexPatternSubscription bool `json:"isRegexPatternSubscription" yaml:"isRegexPatternSubscription"`
ReceiverQueueSize int32 `json:"receiverQueueSize" yaml:"receiverQueueSize"`
//sink spec config
SinkSpecTopic string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
//resources config
Cpu float64 `json:"cpu" yaml:"cpu"`
Ram int64 `json:"ram" yaml:"ram"`
Disk int64 `json:"disk" yaml:"disk"`
//retryDetails config
MaxMessageRetries int32 `json:"maxMessageRetries" yaml:"maxMessageRetries"`
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
ExpectedHealthCheckInterval int32 `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
UserConfig string `json:"userConfig" yaml:"userConfig"`
//metrics config
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
}
var (
help bool
confFilePath string
confContent string
)
func (c *Conf) GetConf() *Conf {
flag.Parse()
if help {
flag.Usage()
}
if confContent == "" && confFilePath == "" {
log.Errorf("no yaml file or conf content provided")
return nil
}
if confFilePath != "" {
yamlFile, err := ioutil.ReadFile(confFilePath)
if err == nil {
err = yaml.Unmarshal(yamlFile, c)
if err != nil {
log.Errorf("unmarshal yaml file error:%s", err.Error())
return nil
}
} else if os.IsNotExist(err) && confContent == "" {
log.Errorf("conf file not found, no config content provided, err:%s", err.Error())
return nil
} else if !os.IsNotExist(err) {
log.Errorf("load conf file failed, err:%s", err.Error())
return nil
}
}
if confContent != "" {
err := json.Unmarshal([]byte(confContent), c)
if err != nil {
log.Errorf("unmarshal config content error:%s", err.Error())
return nil
}
}
return c
}
func init() {
var defaultPath string
if err := os.Chdir("../"); err == nil {
defaultPath = ConfigPath
}
log.Infof("The default config file path is: %s", defaultPath)
flag.BoolVar(&help, "help", false, "print help cmd")
flag.StringVar(&confFilePath, "instance-conf-path", defaultPath, "config conf.yml filepath")
flag.StringVar(&confContent, "instance-conf", "", "the string content of Conf struct")
}