| package storage |
| |
| // Copyright 2017 Microsoft Corporation |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "mime/multipart" |
| "net/http" |
| "net/textproto" |
| "sort" |
| "strings" |
| |
| "github.com/marstr/guid" |
| ) |
| |
| // Operation type. Insert, Delete, Replace etc. |
| type Operation int |
| |
| // consts for batch operations. |
| const ( |
| InsertOp = Operation(1) |
| DeleteOp = Operation(2) |
| ReplaceOp = Operation(3) |
| MergeOp = Operation(4) |
| InsertOrReplaceOp = Operation(5) |
| InsertOrMergeOp = Operation(6) |
| ) |
| |
| // BatchEntity used for tracking Entities to operate on and |
| // whether operations (replace/merge etc) should be forced. |
| // Wrapper for regular Entity with additional data specific for the entity. |
| type BatchEntity struct { |
| *Entity |
| Force bool |
| Op Operation |
| } |
| |
| // TableBatch stores all the entities that will be operated on during a batch process. |
| // Entities can be inserted, replaced or deleted. |
| type TableBatch struct { |
| BatchEntitySlice []BatchEntity |
| |
| // reference to table we're operating on. |
| Table *Table |
| } |
| |
| // defaultChangesetHeaders for changeSets |
| var defaultChangesetHeaders = map[string]string{ |
| "Accept": "application/json;odata=minimalmetadata", |
| "Content-Type": "application/json", |
| "Prefer": "return-no-content", |
| } |
| |
| // NewBatch return new TableBatch for populating. |
| func (t *Table) NewBatch() *TableBatch { |
| return &TableBatch{ |
| Table: t, |
| } |
| } |
| |
| // InsertEntity adds an entity in preparation for a batch insert. |
| func (t *TableBatch) InsertEntity(entity *Entity) { |
| be := BatchEntity{Entity: entity, Force: false, Op: InsertOp} |
| t.BatchEntitySlice = append(t.BatchEntitySlice, be) |
| } |
| |
| // InsertOrReplaceEntity adds an entity in preparation for a batch insert or replace. |
| func (t *TableBatch) InsertOrReplaceEntity(entity *Entity, force bool) { |
| be := BatchEntity{Entity: entity, Force: false, Op: InsertOrReplaceOp} |
| t.BatchEntitySlice = append(t.BatchEntitySlice, be) |
| } |
| |
| // InsertOrReplaceEntityByForce adds an entity in preparation for a batch insert or replace. Forces regardless of ETag |
| func (t *TableBatch) InsertOrReplaceEntityByForce(entity *Entity) { |
| t.InsertOrReplaceEntity(entity, true) |
| } |
| |
| // InsertOrMergeEntity adds an entity in preparation for a batch insert or merge. |
| func (t *TableBatch) InsertOrMergeEntity(entity *Entity, force bool) { |
| be := BatchEntity{Entity: entity, Force: false, Op: InsertOrMergeOp} |
| t.BatchEntitySlice = append(t.BatchEntitySlice, be) |
| } |
| |
| // InsertOrMergeEntityByForce adds an entity in preparation for a batch insert or merge. Forces regardless of ETag |
| func (t *TableBatch) InsertOrMergeEntityByForce(entity *Entity) { |
| t.InsertOrMergeEntity(entity, true) |
| } |
| |
| // ReplaceEntity adds an entity in preparation for a batch replace. |
| func (t *TableBatch) ReplaceEntity(entity *Entity) { |
| be := BatchEntity{Entity: entity, Force: false, Op: ReplaceOp} |
| t.BatchEntitySlice = append(t.BatchEntitySlice, be) |
| } |
| |
| // DeleteEntity adds an entity in preparation for a batch delete |
| func (t *TableBatch) DeleteEntity(entity *Entity, force bool) { |
| be := BatchEntity{Entity: entity, Force: false, Op: DeleteOp} |
| t.BatchEntitySlice = append(t.BatchEntitySlice, be) |
| } |
| |
| // DeleteEntityByForce adds an entity in preparation for a batch delete. Forces regardless of ETag |
| func (t *TableBatch) DeleteEntityByForce(entity *Entity, force bool) { |
| t.DeleteEntity(entity, true) |
| } |
| |
| // MergeEntity adds an entity in preparation for a batch merge |
| func (t *TableBatch) MergeEntity(entity *Entity) { |
| be := BatchEntity{Entity: entity, Force: false, Op: MergeOp} |
| t.BatchEntitySlice = append(t.BatchEntitySlice, be) |
| } |
| |
| // ExecuteBatch executes many table operations in one request to Azure. |
| // The operations can be combinations of Insert, Delete, Replace and Merge |
| // Creates the inner changeset body (various operations, Insert, Delete etc) then creates the outer request packet that encompasses |
| // the changesets. |
| // As per document https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/performing-entity-group-transactions |
| func (t *TableBatch) ExecuteBatch() error { |
| |
| // Using `github.com/marstr/guid` is in response to issue #947 (https://github.com/Azure/azure-sdk-for-go/issues/947). |
| id, err := guid.NewGUIDs(guid.CreationStrategyVersion1) |
| if err != nil { |
| return err |
| } |
| |
| changesetBoundary := fmt.Sprintf("changeset_%s", id.String()) |
| uri := t.Table.tsc.client.getEndpoint(tableServiceName, "$batch", nil) |
| changesetBody, err := t.generateChangesetBody(changesetBoundary) |
| if err != nil { |
| return err |
| } |
| |
| id, err = guid.NewGUIDs(guid.CreationStrategyVersion1) |
| if err != nil { |
| return err |
| } |
| |
| boundary := fmt.Sprintf("batch_%s", id.String()) |
| body, err := generateBody(changesetBody, changesetBoundary, boundary) |
| if err != nil { |
| return err |
| } |
| |
| headers := t.Table.tsc.client.getStandardHeaders() |
| headers[headerContentType] = fmt.Sprintf("multipart/mixed; boundary=%s", boundary) |
| |
| resp, err := t.Table.tsc.client.execBatchOperationJSON(http.MethodPost, uri, headers, bytes.NewReader(body.Bytes()), t.Table.tsc.auth) |
| if err != nil { |
| return err |
| } |
| defer drainRespBody(resp.resp) |
| |
| if err = checkRespCode(resp.resp, []int{http.StatusAccepted}); err != nil { |
| |
| // check which batch failed. |
| operationFailedMessage := t.getFailedOperation(resp.odata.Err.Message.Value) |
| requestID, date, version := getDebugHeaders(resp.resp.Header) |
| return AzureStorageServiceError{ |
| StatusCode: resp.resp.StatusCode, |
| Code: resp.odata.Err.Code, |
| RequestID: requestID, |
| Date: date, |
| APIVersion: version, |
| Message: operationFailedMessage, |
| } |
| } |
| |
| return nil |
| } |
| |
| // getFailedOperation parses the original Azure error string and determines which operation failed |
| // and generates appropriate message. |
| func (t *TableBatch) getFailedOperation(errorMessage string) string { |
| // errorMessage consists of "number:string" we just need the number. |
| sp := strings.Split(errorMessage, ":") |
| if len(sp) > 1 { |
| msg := fmt.Sprintf("Element %s in the batch returned an unexpected response code.\n%s", sp[0], errorMessage) |
| return msg |
| } |
| |
| // cant parse the message, just return the original message to client |
| return errorMessage |
| } |
| |
| // generateBody generates the complete body for the batch request. |
| func generateBody(changeSetBody *bytes.Buffer, changesetBoundary string, boundary string) (*bytes.Buffer, error) { |
| |
| body := new(bytes.Buffer) |
| writer := multipart.NewWriter(body) |
| writer.SetBoundary(boundary) |
| h := make(textproto.MIMEHeader) |
| h.Set(headerContentType, fmt.Sprintf("multipart/mixed; boundary=%s\r\n", changesetBoundary)) |
| batchWriter, err := writer.CreatePart(h) |
| if err != nil { |
| return nil, err |
| } |
| batchWriter.Write(changeSetBody.Bytes()) |
| writer.Close() |
| return body, nil |
| } |
| |
| // generateChangesetBody generates the individual changesets for the various operations within the batch request. |
| // There is a changeset for Insert, Delete, Merge etc. |
| func (t *TableBatch) generateChangesetBody(changesetBoundary string) (*bytes.Buffer, error) { |
| |
| body := new(bytes.Buffer) |
| writer := multipart.NewWriter(body) |
| writer.SetBoundary(changesetBoundary) |
| |
| for _, be := range t.BatchEntitySlice { |
| t.generateEntitySubset(&be, writer) |
| } |
| |
| writer.Close() |
| return body, nil |
| } |
| |
| // generateVerb generates the HTTP request VERB required for each changeset. |
| func generateVerb(op Operation) (string, error) { |
| switch op { |
| case InsertOp: |
| return http.MethodPost, nil |
| case DeleteOp: |
| return http.MethodDelete, nil |
| case ReplaceOp, InsertOrReplaceOp: |
| return http.MethodPut, nil |
| case MergeOp, InsertOrMergeOp: |
| return "MERGE", nil |
| default: |
| return "", errors.New("Unable to detect operation") |
| } |
| } |
| |
| // generateQueryPath generates the query path for within the changesets |
| // For inserts it will just be a table query path (table name) |
| // but for other operations (modifying an existing entity) then |
| // the partition/row keys need to be generated. |
| func (t *TableBatch) generateQueryPath(op Operation, entity *Entity) string { |
| if op == InsertOp { |
| return entity.Table.buildPath() |
| } |
| return entity.buildPath() |
| } |
| |
| // generateGenericOperationHeaders generates common headers for a given operation. |
| func generateGenericOperationHeaders(be *BatchEntity) map[string]string { |
| retval := map[string]string{} |
| |
| for k, v := range defaultChangesetHeaders { |
| retval[k] = v |
| } |
| |
| if be.Op == DeleteOp || be.Op == ReplaceOp || be.Op == MergeOp { |
| if be.Force || be.Entity.OdataEtag == "" { |
| retval["If-Match"] = "*" |
| } else { |
| retval["If-Match"] = be.Entity.OdataEtag |
| } |
| } |
| |
| return retval |
| } |
| |
| // generateEntitySubset generates body payload for particular batch entity |
| func (t *TableBatch) generateEntitySubset(batchEntity *BatchEntity, writer *multipart.Writer) error { |
| |
| h := make(textproto.MIMEHeader) |
| h.Set(headerContentType, "application/http") |
| h.Set(headerContentTransferEncoding, "binary") |
| |
| verb, err := generateVerb(batchEntity.Op) |
| if err != nil { |
| return err |
| } |
| |
| genericOpHeadersMap := generateGenericOperationHeaders(batchEntity) |
| queryPath := t.generateQueryPath(batchEntity.Op, batchEntity.Entity) |
| uri := t.Table.tsc.client.getEndpoint(tableServiceName, queryPath, nil) |
| |
| operationWriter, err := writer.CreatePart(h) |
| if err != nil { |
| return err |
| } |
| |
| urlAndVerb := fmt.Sprintf("%s %s HTTP/1.1\r\n", verb, uri) |
| operationWriter.Write([]byte(urlAndVerb)) |
| writeHeaders(genericOpHeadersMap, &operationWriter) |
| operationWriter.Write([]byte("\r\n")) // additional \r\n is needed per changeset separating the "headers" and the body. |
| |
| // delete operation doesn't need a body. |
| if batchEntity.Op != DeleteOp { |
| //var e Entity = batchEntity.Entity |
| body, err := json.Marshal(batchEntity.Entity) |
| if err != nil { |
| return err |
| } |
| operationWriter.Write(body) |
| } |
| |
| return nil |
| } |
| |
| func writeHeaders(h map[string]string, writer *io.Writer) { |
| // This way it is guaranteed the headers will be written in a sorted order |
| var keys []string |
| for k := range h { |
| keys = append(keys, k) |
| } |
| sort.Strings(keys) |
| for _, k := range keys { |
| (*writer).Write([]byte(fmt.Sprintf("%s: %s\r\n", k, h[k]))) |
| } |
| } |