blob: 82e460b1f7a5aaa70a93a7256db3820faad1ebc7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package rest
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"path"
)
type MediaType string
const (
ApplicationJSON MediaType = "application/json"
PartitionedTopicMetaJSON MediaType = "application/vnd.partitioned-topic-metadata+json"
)
func (m MediaType) String() string {
return string(m)
}
// Client is a base client that is used to make http request to the ServiceURL
type Client struct {
ServiceURL string
HTTPClient *http.Client
VersionInfo string
}
func (c *Client) newRequest(method, path string) (*request, error) {
base, err := url.Parse(c.ServiceURL)
if err != nil {
return nil, err
}
u, err := url.Parse(path)
if err != nil {
return nil, err
}
req := &request{
method: method,
url: &url.URL{
Scheme: base.Scheme,
User: base.User,
Host: base.Host,
Path: endpoint(base.Path, u.Path),
},
params: make(url.Values),
}
return req, nil
}
func (c *Client) doRequest(ctx context.Context, r *request) (*http.Response, error) {
req, err := r.toHTTP(ctx)
if err != nil {
return nil, err
}
if r.contentType != "" {
req.Header.Set("Content-Type", r.contentType)
} else if req.Body != nil {
req.Header.Set("Content-Type", ApplicationJSON.String())
}
req.Header.Set("Accept", ApplicationJSON.String())
req.Header.Set("User-Agent", c.useragent())
hc := c.HTTPClient
if hc == nil {
hc = http.DefaultClient
}
return hc.Do(req)
}
// MakeRequest can make a simple request and handle the response by yourself
func (c *Client) MakeRequest(method, endpoint string) (*http.Response, error) {
return c.MakeRequestWithContext(context.Background(), method, endpoint)
}
// MakeRequestWithContext can make a simple request and handle the response by yourself
func (c *Client) MakeRequestWithContext(ctx context.Context, method, endpoint string) (*http.Response, error) {
req, err := c.newRequest(method, endpoint)
if err != nil {
return nil, err
}
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return nil, err
}
return resp, nil
}
func (c *Client) MakeRequestWithURL(method string, urlOpt *url.URL) (*http.Response, error) {
return c.MakeRequestWithURLWithContext(context.Background(), method, urlOpt)
}
func (c *Client) MakeRequestWithURLWithContext(
ctx context.Context,
method string,
urlOpt *url.URL,
) (*http.Response, error) {
req := &request{
method: method,
url: urlOpt,
params: make(url.Values),
}
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return nil, err
}
return resp, nil
}
func (c *Client) Get(endpoint string, obj interface{}) error {
return c.GetWithContext(context.Background(), endpoint, obj)
}
func (c *Client) GetWithContext(ctx context.Context, endpoint string, obj interface{}) error {
_, err := c.GetWithQueryParamsWithContext(ctx, endpoint, obj, nil, true)
return err
}
func (c *Client) GetBodyWithContext(ctx context.Context, endpoint string, obj interface{}) ([]byte, error) {
return c.GetWithQueryParamsWithContext(ctx, endpoint, obj, nil, true)
}
func (c *Client) GetWithQueryParams(endpoint string, obj interface{}, params map[string]string,
decode bool) ([]byte, error) {
return c.GetWithQueryParamsWithContext(context.Background(), endpoint, obj, params, decode)
}
func (c *Client) GetWithQueryParamsWithContext(
ctx context.Context,
endpoint string,
obj interface{},
params map[string]string,
decode bool,
) ([]byte, error) {
return c.GetWithOptionsWithContext(ctx, endpoint, obj, params, decode, nil)
}
func (c *Client) GetWithOptions(endpoint string, obj interface{}, params map[string]string,
decode bool, file io.Writer) ([]byte, error) {
return c.GetWithOptionsWithContext(context.Background(), endpoint, obj, params, decode, file)
}
func (c *Client) GetWithOptionsWithContext(
ctx context.Context,
endpoint string,
obj interface{},
params map[string]string,
decode bool, file io.Writer,
) ([]byte, error) {
req, err := c.newRequest(http.MethodGet, endpoint)
if err != nil {
return nil, err
}
if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}
//nolint:bodyclose
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return nil, err
}
defer safeRespClose(resp)
if obj != nil {
body, err := decodeJSONWithBody(resp, &obj)
if err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
return body, nil
} else if !decode {
if file != nil {
_, err := io.Copy(file, resp.Body)
if err != nil {
return nil, err
}
} else {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, err
}
}
return nil, err
}
func (c *Client) useragent() string {
return c.VersionInfo
}
func (c *Client) Put(endpoint string, in interface{}) error {
return c.PutWithContext(context.Background(), endpoint, in)
}
func (c *Client) PutWithContext(ctx context.Context, endpoint string, in interface{}) error {
return c.PutWithQueryParamsWithContext(ctx, endpoint, in, nil, nil)
}
func (c *Client) PutWithQueryParams(endpoint string, in, obj interface{}, params map[string]string) error {
return c.PutWithQueryParamsWithContext(context.Background(), endpoint, in, obj, params)
}
func (c *Client) PutWithQueryParamsWithContext(
ctx context.Context,
endpoint string,
in,
obj interface{},
params map[string]string,
) error {
return c.PutWithCustomMediaTypeWithContext(ctx, endpoint, in, obj, params, "")
}
func (c *Client) PutWithCustomMediaType(
endpoint string,
in, obj interface{},
params map[string]string,
mediaType MediaType,
) error {
return c.PutWithCustomMediaTypeWithContext(context.Background(), endpoint, in, obj, params, mediaType)
}
func (c *Client) PutWithCustomMediaTypeWithContext(
ctx context.Context,
endpoint string,
in,
obj interface{},
params map[string]string,
mediaType MediaType,
) error {
req, err := c.newRequest(http.MethodPut, endpoint)
if err != nil {
return err
}
if mediaType != "" {
req.contentType = mediaType.String()
}
req.obj = in
if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}
//nolint:bodyclose
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return err
}
defer safeRespClose(resp)
if obj != nil {
if err := decodeJSONBody(resp, &obj); err != nil {
return err
}
}
return nil
}
func (c *Client) PutWithMultiPart(
endpoint string,
body io.Reader,
contentType string,
) error {
return c.PutWithMultiPartWithContext(context.Background(), endpoint, body, contentType)
}
func (c *Client) PutWithMultiPartWithContext(
ctx context.Context,
endpoint string,
body io.Reader,
contentType string,
) error {
req, err := c.newRequest(http.MethodPut, endpoint)
if err != nil {
return err
}
req.body = body
req.contentType = contentType
//nolint
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return err
}
defer safeRespClose(resp)
return nil
}
func (c *Client) Delete(endpoint string) error {
return c.DeleteWithContext(context.Background(), endpoint)
}
func (c *Client) DeleteWithContext(ctx context.Context, endpoint string) error {
return c.DeleteWithQueryParamsWithContext(ctx, endpoint, nil)
}
func (c *Client) DeleteWithQueryParams(endpoint string, params map[string]string) error {
return c.DeleteWithQueryParamsWithContext(context.Background(), endpoint, params)
}
func (c *Client) DeleteWithQueryParamsWithContext(
ctx context.Context,
endpoint string,
params map[string]string,
) error {
req, err := c.newRequest(http.MethodDelete, endpoint)
if err != nil {
return err
}
if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}
//nolint
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return err
}
defer safeRespClose(resp)
return nil
}
func (c *Client) Post(endpoint string, in interface{}) error {
return c.PostWithContext(context.Background(), endpoint, in)
}
func (c *Client) PostWithContext(ctx context.Context, endpoint string, in interface{}) error {
return c.PostWithObjWithContext(ctx, endpoint, in, nil)
}
func (c *Client) PostWithObj(endpoint string, in, obj interface{}) error {
return c.PostWithObjWithContext(context.Background(), endpoint, in, obj)
}
func (c *Client) PostWithObjWithContext(ctx context.Context, endpoint string, in, obj interface{}) error {
req, err := c.newRequest(http.MethodPost, endpoint)
if err != nil {
return err
}
req.obj = in
//nolint
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return err
}
defer safeRespClose(resp)
if obj != nil {
if err := decodeJSONBody(resp, &obj); err != nil {
return err
}
}
return nil
}
func (c *Client) PostWithMultiPart(endpoint string, in interface{}, body io.Reader, contentType string) error {
return c.PostWithMultiPartWithContext(context.Background(), endpoint, in, body, contentType)
}
func (c *Client) PostWithMultiPartWithContext(
ctx context.Context,
endpoint string,
in interface{},
body io.Reader,
contentType string,
) error {
req, err := c.newRequest(http.MethodPost, endpoint)
if err != nil {
return err
}
req.obj = in
req.body = body
req.contentType = contentType
//nolint
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return err
}
defer safeRespClose(resp)
return nil
}
func (c *Client) PostWithQueryParams(endpoint string, in interface{}, params map[string]string) error {
return c.PostWithQueryParamsWithContext(context.Background(), endpoint, in, params)
}
func (c *Client) PostWithQueryParamsWithContext(
ctx context.Context,
endpoint string,
in interface{},
params map[string]string,
) error {
req, err := c.newRequest(http.MethodPost, endpoint)
if err != nil {
return err
}
if in != nil {
req.obj = in
}
if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}
//nolint
resp, err := checkSuccessful(c.doRequest(ctx, req))
if err != nil {
return err
}
defer safeRespClose(resp)
return nil
}
type request struct {
method string
contentType string
url *url.URL
params url.Values
obj interface{}
body io.Reader
}
func (r *request) toHTTP(ctx context.Context) (*http.Request, error) {
r.url.RawQuery = r.params.Encode()
// add a request body if there is one
if r.body == nil && r.obj != nil {
body, err := encodeJSONBody(r.obj)
if err != nil {
return nil, err
}
r.body = body
}
req, err := http.NewRequestWithContext(ctx, r.method, r.url.RequestURI(), r.body)
if err != nil {
return nil, err
}
req.URL.Host = r.url.Host
req.URL.Scheme = r.url.Scheme
req.Host = r.url.Host
return req, nil
}
// respIsOk is used to validate a successful http status code
func respIsOk(resp *http.Response) bool {
return resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusNoContent
}
// checkSuccessful checks for a valid response and parses an error
func checkSuccessful(resp *http.Response, err error) (*http.Response, error) {
if err != nil {
safeRespClose(resp)
return nil, err
}
if !respIsOk(resp) {
defer safeRespClose(resp)
return nil, responseError(resp)
}
return resp, nil
}
func endpoint(parts ...string) string {
return path.Join(parts...)
}
// encodeJSONBody is used to JSON encode a body
func encodeJSONBody(obj interface{}) (io.Reader, error) {
b, err := json.Marshal(obj)
if err != nil {
return nil, err
}
return bytes.NewReader(b), nil
}
// decodeJSONBody is used to JSON decode a body
func decodeJSONBody(resp *http.Response, out interface{}) error {
if resp.ContentLength == 0 {
return nil
}
dec := json.NewDecoder(resp.Body)
return dec.Decode(out)
}
// decodeJSONWithBody is used to JSON decode a body AND ALSO return the raw body bytes
func decodeJSONWithBody(resp *http.Response, out interface{}) ([]byte, error) {
// Read the body first so we can return it even after decoding
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if len(body) == 0 {
return nil, nil
}
if err := json.Unmarshal(body, &out); err != nil {
return nil, err
}
return body, nil
}
// safeRespClose is used to close a response body
func safeRespClose(resp *http.Response) {
if resp != nil {
// ignore error since it is closing a response body
_ = resp.Body.Close()
}
}
// responseError is used to parse a response into a client error
func responseError(resp *http.Response) error {
e := Error{
Code: resp.StatusCode,
Reason: resp.Status,
}
body, err := io.ReadAll(resp.Body)
if err != nil {
e.Reason = err.Error()
return e
}
err = json.Unmarshal(body, &e)
if err != nil {
if len(body) != 0 {
e.Reason = string(body)
}
return e
}
return e
}