blob: 35835c2d523d47e492c90f8f53372dfd4e1595ae [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package sdk
import (
"encoding/json"
"net/url"
"strconv"
"strings"
"github.com/apache/airflow/go-sdk/pkg/api"
)
type Connection struct {
ID string
// The Connection type, as defined in Airflow
Type string
Host string
Port int
// Login/username of the connection. Optional, can be nil (nil is an indication of no login which is distinct from an empty login)
Login *string
// Password of the connection. Optional, can be nil.
Password *string
// The path. Called `Schema` in Airflow python code. For database connections this often contains the
// database name.
Path string
Extra map[string]any
}
func (c Connection) GetURI() *url.URL {
uri := &url.URL{}
if c.Type != "" {
uri.Scheme = c.Type
}
if strings.Contains(c.Host, "://") {
parts := strings.SplitN(c.Host, "://", 2)
uri.Host = parts[1]
// Different protocol, add it to the URI
if parts[0] != c.Type {
uri.Scheme = parts[0]
}
} else {
uri.Host = c.Host
}
// Host (with port) -- not a separate field on URL object
if c.Port != 0 {
uri.Host = uri.Host + ":" + strconv.Itoa(c.Port)
}
// Authority block (login:password@)
if c.Login != nil {
if c.Password != nil {
uri.User = url.UserPassword(*c.Login, *c.Password)
} else {
uri.User = url.User(*c.Login)
}
}
if c.Path != "" {
uri.Path = "/" + c.Path
}
// Extra/query parameters
if len(c.Extra) > 0 {
values := url.Values{}
for k, v := range c.Extra {
switch val := v.(type) {
case string:
values.Set(k, val)
case int:
values.Set(k, strconv.Itoa(val))
case float64:
values.Set(k, strconv.FormatFloat(val, 'f', -1, 64))
case bool:
values.Set(k, strconv.FormatBool(val))
default:
// For nested objects/arrays, marshal as JSON string
jsonVal, err := json.Marshal(val)
if err == nil {
values.Set(k, string(jsonVal))
}
}
}
uri.RawQuery = values.Encode()
}
return uri
}
func connFromAPIResponse(resp *api.ConnectionResponse) (Connection, error) {
var err error
conn := Connection{
ID: resp.ConnId,
Type: resp.ConnType,
Host: "",
Port: 0,
Login: resp.Login,
Password: resp.Password,
}
if resp.Host != nil {
conn.Host = *resp.Host
}
if resp.Port != nil {
conn.Port = *resp.Port
}
if resp.Schema != nil {
conn.Path = *resp.Schema
}
if resp.Extra != nil {
conn.Extra = map[string]any{}
err = json.Unmarshal([]byte(*resp.Extra), &conn.Extra)
}
return conn, err
}