| package storage |
| |
| // Copyright 2017 Microsoft Corporation |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| import ( |
| "encoding/xml" |
| "fmt" |
| "io" |
| "net/http" |
| "net/url" |
| "strconv" |
| "time" |
| ) |
| |
| const ( |
| // casing is per Golang's http.Header canonicalizing the header names. |
| approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count" |
| ) |
| |
| // QueueAccessPolicy represents each access policy in the queue ACL. |
| type QueueAccessPolicy struct { |
| ID string |
| StartTime time.Time |
| ExpiryTime time.Time |
| CanRead bool |
| CanAdd bool |
| CanUpdate bool |
| CanProcess bool |
| } |
| |
| // QueuePermissions represents the queue ACLs. |
| type QueuePermissions struct { |
| AccessPolicies []QueueAccessPolicy |
| } |
| |
| // SetQueuePermissionOptions includes options for a set queue permissions operation |
| type SetQueuePermissionOptions struct { |
| Timeout uint |
| RequestID string `header:"x-ms-client-request-id"` |
| } |
| |
| // Queue represents an Azure queue. |
| type Queue struct { |
| qsc *QueueServiceClient |
| Name string |
| Metadata map[string]string |
| AproxMessageCount uint64 |
| } |
| |
| func (q *Queue) buildPath() string { |
| return fmt.Sprintf("/%s", q.Name) |
| } |
| |
| func (q *Queue) buildPathMessages() string { |
| return fmt.Sprintf("%s/messages", q.buildPath()) |
| } |
| |
| // QueueServiceOptions includes options for some queue service operations |
| type QueueServiceOptions struct { |
| Timeout uint |
| RequestID string `header:"x-ms-client-request-id"` |
| } |
| |
| // Create operation creates a queue under the given account. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Create-Queue4 |
| func (q *Queue) Create(options *QueueServiceOptions) error { |
| params := url.Values{} |
| headers := q.qsc.client.getStandardHeaders() |
| headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata) |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) |
| |
| resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp) |
| return checkRespCode(resp, []int{http.StatusCreated}) |
| } |
| |
| // Delete operation permanently deletes the specified queue. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Delete-Queue3 |
| func (q *Queue) Delete(options *QueueServiceOptions) error { |
| params := url.Values{} |
| headers := q.qsc.client.getStandardHeaders() |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) |
| resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp) |
| return checkRespCode(resp, []int{http.StatusNoContent}) |
| } |
| |
| // Exists returns true if a queue with given name exists. |
| func (q *Queue) Exists() (bool, error) { |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}}) |
| resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth) |
| if resp != nil { |
| defer drainRespBody(resp) |
| if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound { |
| return resp.StatusCode == http.StatusOK, nil |
| } |
| err = getErrorFromResponse(resp) |
| } |
| return false, err |
| } |
| |
| // SetMetadata operation sets user-defined metadata on the specified queue. |
| // Metadata is associated with the queue as name-value pairs. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata |
| func (q *Queue) SetMetadata(options *QueueServiceOptions) error { |
| params := url.Values{"comp": {"metadata"}} |
| headers := q.qsc.client.getStandardHeaders() |
| headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata) |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) |
| |
| resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp) |
| return checkRespCode(resp, []int{http.StatusNoContent}) |
| } |
| |
| // GetMetadata operation retrieves user-defined metadata and queue |
| // properties on the specified queue. Metadata is associated with |
| // the queue as name-values pairs. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata |
| // |
| // Because the way Golang's http client (and http.Header in particular) |
| // canonicalize header names, the returned metadata names would always |
| // be all lower case. |
| func (q *Queue) GetMetadata(options *QueueServiceOptions) error { |
| params := url.Values{"comp": {"metadata"}} |
| headers := q.qsc.client.getStandardHeaders() |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) |
| |
| resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp) |
| |
| if err := checkRespCode(resp, []int{http.StatusOK}); err != nil { |
| return err |
| } |
| |
| aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader)) |
| if aproxMessagesStr != "" { |
| aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64) |
| if err != nil { |
| return err |
| } |
| q.AproxMessageCount = aproxMessages |
| } |
| |
| q.Metadata = getMetadataFromHeaders(resp.Header) |
| return nil |
| } |
| |
| // GetMessageReference returns a message object with the specified text. |
| func (q *Queue) GetMessageReference(text string) *Message { |
| return &Message{ |
| Queue: q, |
| Text: text, |
| } |
| } |
| |
| // GetMessagesOptions is the set of options can be specified for Get |
| // Messsages operation. A zero struct does not use any preferences for the |
| // request. |
| type GetMessagesOptions struct { |
| Timeout uint |
| NumOfMessages int |
| VisibilityTimeout int |
| RequestID string `header:"x-ms-client-request-id"` |
| } |
| |
| type messages struct { |
| XMLName xml.Name `xml:"QueueMessagesList"` |
| Messages []Message `xml:"QueueMessage"` |
| } |
| |
| // GetMessages operation retrieves one or more messages from the front of the |
| // queue. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Get-Messages |
| func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) { |
| query := url.Values{} |
| headers := q.qsc.client.getStandardHeaders() |
| |
| if options != nil { |
| if options.NumOfMessages != 0 { |
| query.Set("numofmessages", strconv.Itoa(options.NumOfMessages)) |
| } |
| if options.VisibilityTimeout != 0 { |
| query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout)) |
| } |
| query = addTimeout(query, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query) |
| |
| resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return []Message{}, err |
| } |
| defer resp.Body.Close() |
| |
| var out messages |
| err = xmlUnmarshal(resp.Body, &out) |
| if err != nil { |
| return []Message{}, err |
| } |
| for i := range out.Messages { |
| out.Messages[i].Queue = q |
| } |
| return out.Messages, err |
| } |
| |
| // PeekMessagesOptions is the set of options can be specified for Peek |
| // Messsage operation. A zero struct does not use any preferences for the |
| // request. |
| type PeekMessagesOptions struct { |
| Timeout uint |
| NumOfMessages int |
| RequestID string `header:"x-ms-client-request-id"` |
| } |
| |
| // PeekMessages retrieves one or more messages from the front of the queue, but |
| // does not alter the visibility of the message. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Peek-Messages |
| func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) { |
| query := url.Values{"peekonly": {"true"}} // Required for peek operation |
| headers := q.qsc.client.getStandardHeaders() |
| |
| if options != nil { |
| if options.NumOfMessages != 0 { |
| query.Set("numofmessages", strconv.Itoa(options.NumOfMessages)) |
| } |
| query = addTimeout(query, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query) |
| |
| resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return []Message{}, err |
| } |
| defer resp.Body.Close() |
| |
| var out messages |
| err = xmlUnmarshal(resp.Body, &out) |
| if err != nil { |
| return []Message{}, err |
| } |
| for i := range out.Messages { |
| out.Messages[i].Queue = q |
| } |
| return out.Messages, err |
| } |
| |
| // ClearMessages operation deletes all messages from the specified queue. |
| // |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Clear-Messages |
| func (q *Queue) ClearMessages(options *QueueServiceOptions) error { |
| params := url.Values{} |
| headers := q.qsc.client.getStandardHeaders() |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params) |
| |
| resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp) |
| return checkRespCode(resp, []int{http.StatusNoContent}) |
| } |
| |
| // SetPermissions sets up queue permissions |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/set-queue-acl |
| func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error { |
| body, length, err := generateQueueACLpayload(permissions.AccessPolicies) |
| if err != nil { |
| return err |
| } |
| |
| params := url.Values{ |
| "comp": {"acl"}, |
| } |
| headers := q.qsc.client.getStandardHeaders() |
| headers["Content-Length"] = strconv.Itoa(length) |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) |
| resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp) |
| return checkRespCode(resp, []int{http.StatusNoContent}) |
| } |
| |
| func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) { |
| sil := SignedIdentifiers{ |
| SignedIdentifiers: []SignedIdentifier{}, |
| } |
| for _, qapd := range policies { |
| permission := qapd.generateQueuePermissions() |
| signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission) |
| sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier) |
| } |
| return xmlMarshal(sil) |
| } |
| |
| func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) { |
| // generate the permissions string (raup). |
| // still want the end user API to have bool flags. |
| permissions = "" |
| |
| if qapd.CanRead { |
| permissions += "r" |
| } |
| |
| if qapd.CanAdd { |
| permissions += "a" |
| } |
| |
| if qapd.CanUpdate { |
| permissions += "u" |
| } |
| |
| if qapd.CanProcess { |
| permissions += "p" |
| } |
| |
| return permissions |
| } |
| |
| // GetQueuePermissionOptions includes options for a get queue permissions operation |
| type GetQueuePermissionOptions struct { |
| Timeout uint |
| RequestID string `header:"x-ms-client-request-id"` |
| } |
| |
| // GetPermissions gets the queue permissions as per https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/get-queue-acl |
| // If timeout is 0 then it will not be passed to Azure |
| func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) { |
| params := url.Values{ |
| "comp": {"acl"}, |
| } |
| headers := q.qsc.client.getStandardHeaders() |
| |
| if options != nil { |
| params = addTimeout(params, options.Timeout) |
| headers = mergeHeaders(headers, headersFromStruct(*options)) |
| } |
| uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params) |
| resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth) |
| if err != nil { |
| return nil, err |
| } |
| defer resp.Body.Close() |
| |
| var ap AccessPolicy |
| err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList) |
| if err != nil { |
| return nil, err |
| } |
| return buildQueueAccessPolicy(ap, &resp.Header), nil |
| } |
| |
| func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions { |
| permissions := QueuePermissions{ |
| AccessPolicies: []QueueAccessPolicy{}, |
| } |
| |
| for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers { |
| qapd := QueueAccessPolicy{ |
| ID: policy.ID, |
| StartTime: policy.AccessPolicy.StartTime, |
| ExpiryTime: policy.AccessPolicy.ExpiryTime, |
| } |
| qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r") |
| qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a") |
| qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u") |
| qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p") |
| |
| permissions.AccessPolicies = append(permissions.AccessPolicies, qapd) |
| } |
| return &permissions |
| } |