blob: ef5f7dcbd03175e6c17e64dca3a5a5671463e7ca [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/textproto"
"os"
"path"
"path/filepath"
"strings"
"github.com/pkg/errors"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Packages is admin interface for functions management
type Packages interface {
// Download downloads Function/Connector Package
// @param destinationFile
// file where data should be downloaded to
// @param packageURL
// the package URL
Download(packageURL, destinationFile string) error
// DownloadWithContext downloads Function/Connector Package
// @param ctx
// context used for the request
// @param destinationFile
// file where data should be downloaded to
// @param packageURL
// the package URL
DownloadWithContext(ctx context.Context, packageURL, destinationFile string) error
// Upload uploads Function/Connector Package
// @param filePath
// file where data should be uploaded to
// @param packageURL
// type://tenant/namespace/packageName@version
// @param description
// descriptions of a package
// @param contact
// contact information of a package
// @param properties
// external informations of a package
Upload(packageURL, filePath, description, contact string, properties map[string]string) error
// UploadWithContext uploads Function/Connector Package
// @param ctx
// context used for the request
// @param filePath
// file where data should be uploaded to
// @param packageURL
// type://tenant/namespace/packageName@version
// @param description
// descriptions of a package
// @param contact
// contact information of a package
// @param properties
// external informations of a package
UploadWithContext(
ctx context.Context,
packageURL,
filePath,
description,
contact string,
properties map[string]string,
) error
// List lists all the packages with the given type in a namespace
List(typeName, namespace string) ([]string, error)
// ListWithContext lists all the packages with the given type in a namespace
ListWithContext(ctx context.Context, typeName, namespace string) ([]string, error)
// ListVersions lists all the versions of a package
ListVersions(packageURL string) ([]string, error)
// ListVersionsWithContext lists all the versions of a package
ListVersionsWithContext(ctx context.Context, packageURL string) ([]string, error)
// Delete deletes the specified package
Delete(packageURL string) error
// DeleteWithContext deletes the specified package
DeleteWithContext(ctx context.Context, packageURL string) error
// GetMetadata returns a package metadata information
GetMetadata(packageURL string) (utils.PackageMetadata, error)
// GetMetadataWithContext returns a package metadata information
GetMetadataWithContext(ctx context.Context, packageURL string) (utils.PackageMetadata, error)
// UpdateMetadata updates a package metadata information
UpdateMetadata(packageURL, description, contact string, properties map[string]string) error
// UpdateMetadataWithContext updates a package metadata information
UpdateMetadataWithContext(
ctx context.Context,
packageURL,
description,
contact string,
properties map[string]string,
) error
}
type packages struct {
pulsar *pulsarClient
basePath string
}
func (p *packages) createStringFromField(w *multipart.Writer, value string) (io.Writer, error) {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value))
h.Set("Content-Type", "application/json")
return w.CreatePart(h)
}
// Packages is used to access the functions endpoints
func (c *pulsarClient) Packages() Packages {
return &packages{
pulsar: c,
basePath: "/packages",
}
}
func (p packages) Download(packageURL, destinationFile string) error {
return p.DownloadWithContext(context.Background(), packageURL, destinationFile)
}
func (p packages) DownloadWithContext(ctx context.Context, packageURL, destinationFile string) error {
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return err
}
endpoint := p.pulsar.endpoint(p.basePath, string(packageName.GetType()), packageName.GetTenant(),
packageName.GetNamespace(), packageName.GetName(), packageName.GetVersion())
parent := path.Dir(destinationFile)
if parent != "." {
err = os.MkdirAll(parent, 0755)
if err != nil {
return fmt.Errorf("failed to create parent directory %s: %w", parent, err)
}
}
_, err = os.Open(destinationFile)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("file %s already exists, please delete "+
"the file first or change the file name", destinationFile)
}
}
file, err := os.Create(destinationFile)
if err != nil {
return err
}
_, err = p.pulsar.Client.GetWithOptionsWithContext(ctx, endpoint, nil, nil, false, file)
if err != nil {
return err
}
return nil
}
func (p packages) Upload(packageURL, filePath, description, contact string, properties map[string]string) error {
return p.UploadWithContext(context.Background(), packageURL, filePath, description, contact, properties)
}
func (p packages) UploadWithContext(
ctx context.Context,
packageURL,
filePath,
description,
contact string,
properties map[string]string,
) error {
if strings.TrimSpace(filePath) == "" {
return errors.New("file path is empty")
}
if strings.TrimSpace(packageURL) == "" {
return errors.New("package URL is empty")
}
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return err
}
endpoint := p.pulsar.endpoint(p.basePath, string(packageName.GetType()), packageName.GetTenant(),
packageName.GetNamespace(), packageName.GetName(), packageName.GetVersion())
metadata := utils.PackageMetadata{
Description: description,
Contact: contact,
Properties: properties,
}
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
metadataJSON, err := json.Marshal(metadata)
if err != nil {
return err
}
stringWriter, err := p.createStringFromField(multiPartWriter, "metadata")
if err != nil {
return err
}
_, err = stringWriter.Write(metadataJSON)
if err != nil {
return err
}
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("file", filepath.Base(file.Name()))
if err != nil {
return err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return err
}
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = p.pulsar.Client.PostWithMultiPartWithContext(ctx, endpoint, nil, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (p packages) List(typeName, namespace string) ([]string, error) {
return p.ListWithContext(context.Background(), typeName, namespace)
}
func (p packages) ListWithContext(ctx context.Context, typeName, namespace string) ([]string, error) {
var packageList []string
endpoint := p.pulsar.endpoint(p.basePath, typeName, namespace)
err := p.pulsar.Client.GetWithContext(ctx, endpoint, &packageList)
return packageList, err
}
func (p packages) ListVersions(packageURL string) ([]string, error) {
return p.ListVersionsWithContext(context.Background(), packageURL)
}
func (p packages) ListVersionsWithContext(ctx context.Context, packageURL string) ([]string, error) {
var versionList []string
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return versionList, err
}
endpoint := p.pulsar.endpoint(p.basePath, string(packageName.GetType()), packageName.GetTenant(),
packageName.GetNamespace(), packageName.GetName())
err = p.pulsar.Client.GetWithContext(ctx, endpoint, &versionList)
return versionList, err
}
func (p packages) Delete(packageURL string) error {
return p.DeleteWithContext(context.Background(), packageURL)
}
func (p packages) DeleteWithContext(ctx context.Context, packageURL string) error {
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return err
}
endpoint := p.pulsar.endpoint(p.basePath, string(packageName.GetType()), packageName.GetTenant(),
packageName.GetNamespace(), packageName.GetName(), packageName.GetVersion())
return p.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (p packages) GetMetadata(packageURL string) (utils.PackageMetadata, error) {
return p.GetMetadataWithContext(context.Background(), packageURL)
}
func (p packages) GetMetadataWithContext(ctx context.Context, packageURL string) (utils.PackageMetadata, error) {
var metadata utils.PackageMetadata
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return metadata, err
}
endpoint := p.pulsar.endpoint(p.basePath, string(packageName.GetType()), packageName.GetTenant(),
packageName.GetNamespace(), packageName.GetName(), packageName.GetVersion(), "metadata")
err = p.pulsar.Client.GetWithContext(ctx, endpoint, &metadata)
return metadata, err
}
func (p packages) UpdateMetadata(packageURL, description, contact string, properties map[string]string) error {
return p.UpdateMetadataWithContext(context.Background(), packageURL, description, contact, properties)
}
func (p packages) UpdateMetadataWithContext(
ctx context.Context,
packageURL,
description,
contact string,
properties map[string]string,
) error {
metadata := utils.PackageMetadata{
Description: description,
Contact: contact,
Properties: properties,
}
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return err
}
endpoint := p.pulsar.endpoint(p.basePath, string(packageName.GetType()), packageName.GetTenant(),
packageName.GetNamespace(), packageName.GetName(), packageName.GetVersion(), "metadata")
return p.pulsar.Client.PutWithContext(ctx, endpoint, &metadata)
}