| // Copyright 2014 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package datastore |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "log" |
| "os" |
| "reflect" |
| |
| "cloud.google.com/go/internal/trace" |
| "google.golang.org/api/option" |
| gtransport "google.golang.org/api/transport/grpc" |
| pb "google.golang.org/genproto/googleapis/datastore/v1" |
| "google.golang.org/grpc" |
| ) |
| |
| const ( |
| prodAddr = "datastore.googleapis.com:443" |
| userAgent = "gcloud-golang-datastore/20160401" |
| ) |
| |
| // ScopeDatastore grants permissions to view and/or manage datastore entities |
| const ScopeDatastore = "https://www.googleapis.com/auth/datastore" |
| |
| // resourcePrefixHeader is the name of the metadata header used to indicate |
| // the resource being operated on. |
| const resourcePrefixHeader = "google-cloud-resource-prefix" |
| |
| // Client is a client for reading and writing data in a datastore dataset. |
| type Client struct { |
| conn *grpc.ClientConn |
| client pb.DatastoreClient |
| endpoint string |
| dataset string // Called dataset by the datastore API, synonym for project ID. |
| } |
| |
| // NewClient creates a new Client for a given dataset. |
| // If the project ID is empty, it is derived from the DATASTORE_PROJECT_ID environment variable. |
| // If the DATASTORE_EMULATOR_HOST environment variable is set, client will use its value |
| // to connect to a locally-running datastore emulator. |
| func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) { |
| var o []option.ClientOption |
| // Environment variables for gcd emulator: |
| // https://cloud.google.com/datastore/docs/tools/datastore-emulator |
| // If the emulator is available, dial it without passing any credentials. |
| if addr := os.Getenv("DATASTORE_EMULATOR_HOST"); addr != "" { |
| o = []option.ClientOption{ |
| option.WithEndpoint(addr), |
| option.WithoutAuthentication(), |
| option.WithGRPCDialOption(grpc.WithInsecure()), |
| } |
| } else { |
| o = []option.ClientOption{ |
| option.WithEndpoint(prodAddr), |
| option.WithScopes(ScopeDatastore), |
| option.WithUserAgent(userAgent), |
| } |
| } |
| // Warn if we see the legacy emulator environment variables. |
| if os.Getenv("DATASTORE_HOST") != "" && os.Getenv("DATASTORE_EMULATOR_HOST") == "" { |
| log.Print("WARNING: legacy environment variable DATASTORE_HOST is ignored. Use DATASTORE_EMULATOR_HOST instead.") |
| } |
| if os.Getenv("DATASTORE_DATASET") != "" && os.Getenv("DATASTORE_PROJECT_ID") == "" { |
| log.Print("WARNING: legacy environment variable DATASTORE_DATASET is ignored. Use DATASTORE_PROJECT_ID instead.") |
| } |
| if projectID == "" { |
| projectID = os.Getenv("DATASTORE_PROJECT_ID") |
| } |
| if projectID == "" { |
| return nil, errors.New("datastore: missing project/dataset id") |
| } |
| o = append(o, opts...) |
| conn, err := gtransport.Dial(ctx, o...) |
| if err != nil { |
| return nil, fmt.Errorf("dialing: %v", err) |
| } |
| return &Client{ |
| conn: conn, |
| client: newDatastoreClient(conn, projectID), |
| dataset: projectID, |
| }, nil |
| |
| } |
| |
| var ( |
| // ErrInvalidEntityType is returned when functions like Get or Next are |
| // passed a dst or src argument of invalid type. |
| ErrInvalidEntityType = errors.New("datastore: invalid entity type") |
| // ErrInvalidKey is returned when an invalid key is presented. |
| ErrInvalidKey = errors.New("datastore: invalid key") |
| // ErrNoSuchEntity is returned when no entity was found for a given key. |
| ErrNoSuchEntity = errors.New("datastore: no such entity") |
| ) |
| |
| type multiArgType int |
| |
| const ( |
| multiArgTypeInvalid multiArgType = iota |
| multiArgTypePropertyLoadSaver |
| multiArgTypeStruct |
| multiArgTypeStructPtr |
| multiArgTypeInterface |
| ) |
| |
| // ErrFieldMismatch is returned when a field is to be loaded into a different |
| // type than the one it was stored from, or when a field is missing or |
| // unexported in the destination struct. |
| // StructType is the type of the struct pointed to by the destination argument |
| // passed to Get or to Iterator.Next. |
| type ErrFieldMismatch struct { |
| StructType reflect.Type |
| FieldName string |
| Reason string |
| } |
| |
| func (e *ErrFieldMismatch) Error() string { |
| return fmt.Sprintf("datastore: cannot load field %q into a %q: %s", |
| e.FieldName, e.StructType, e.Reason) |
| } |
| |
| // GeoPoint represents a location as latitude/longitude in degrees. |
| type GeoPoint struct { |
| Lat, Lng float64 |
| } |
| |
| // Valid returns whether a GeoPoint is within [-90, 90] latitude and [-180, 180] longitude. |
| func (g GeoPoint) Valid() bool { |
| return -90 <= g.Lat && g.Lat <= 90 && -180 <= g.Lng && g.Lng <= 180 |
| } |
| |
| func keyToProto(k *Key) *pb.Key { |
| if k == nil { |
| return nil |
| } |
| |
| var path []*pb.Key_PathElement |
| for { |
| el := &pb.Key_PathElement{Kind: k.Kind} |
| if k.ID != 0 { |
| el.IdType = &pb.Key_PathElement_Id{Id: k.ID} |
| } else if k.Name != "" { |
| el.IdType = &pb.Key_PathElement_Name{Name: k.Name} |
| } |
| path = append(path, el) |
| if k.Parent == nil { |
| break |
| } |
| k = k.Parent |
| } |
| |
| // The path should be in order [grandparent, parent, child] |
| // We did it backward above, so reverse back. |
| for i := 0; i < len(path)/2; i++ { |
| path[i], path[len(path)-i-1] = path[len(path)-i-1], path[i] |
| } |
| |
| key := &pb.Key{Path: path} |
| if k.Namespace != "" { |
| key.PartitionId = &pb.PartitionId{ |
| NamespaceId: k.Namespace, |
| } |
| } |
| return key |
| } |
| |
| // protoToKey decodes a protocol buffer representation of a key into an |
| // equivalent *Key object. If the key is invalid, protoToKey will return the |
| // invalid key along with ErrInvalidKey. |
| func protoToKey(p *pb.Key) (*Key, error) { |
| var key *Key |
| var namespace string |
| if partition := p.PartitionId; partition != nil { |
| namespace = partition.NamespaceId |
| } |
| for _, el := range p.Path { |
| key = &Key{ |
| Namespace: namespace, |
| Kind: el.Kind, |
| ID: el.GetId(), |
| Name: el.GetName(), |
| Parent: key, |
| } |
| } |
| if !key.valid() { // Also detects key == nil. |
| return key, ErrInvalidKey |
| } |
| return key, nil |
| } |
| |
| // multiKeyToProto is a batch version of keyToProto. |
| func multiKeyToProto(keys []*Key) []*pb.Key { |
| ret := make([]*pb.Key, len(keys)) |
| for i, k := range keys { |
| ret[i] = keyToProto(k) |
| } |
| return ret |
| } |
| |
| // multiKeyToProto is a batch version of keyToProto. |
| func multiProtoToKey(keys []*pb.Key) ([]*Key, error) { |
| hasErr := false |
| ret := make([]*Key, len(keys)) |
| err := make(MultiError, len(keys)) |
| for i, k := range keys { |
| ret[i], err[i] = protoToKey(k) |
| if err[i] != nil { |
| hasErr = true |
| } |
| } |
| if hasErr { |
| return nil, err |
| } |
| return ret, nil |
| } |
| |
| // multiValid is a batch version of Key.valid. It returns an error, not a |
| // []bool. |
| func multiValid(key []*Key) error { |
| invalid := false |
| for _, k := range key { |
| if !k.valid() { |
| invalid = true |
| break |
| } |
| } |
| if !invalid { |
| return nil |
| } |
| err := make(MultiError, len(key)) |
| for i, k := range key { |
| if !k.valid() { |
| err[i] = ErrInvalidKey |
| } |
| } |
| return err |
| } |
| |
| // checkMultiArg checks that v has type []S, []*S, []I, or []P, for some struct |
| // type S, for some interface type I, or some non-interface non-pointer type P |
| // such that P or *P implements PropertyLoadSaver. |
| // |
| // It returns what category the slice's elements are, and the reflect.Type |
| // that represents S, I or P. |
| // |
| // As a special case, PropertyList is an invalid type for v. |
| // |
| // TODO(djd): multiArg is very confusing. Fold this logic into the |
| // relevant Put/Get methods to make the logic less opaque. |
| func checkMultiArg(v reflect.Value) (m multiArgType, elemType reflect.Type) { |
| if v.Kind() != reflect.Slice { |
| return multiArgTypeInvalid, nil |
| } |
| if v.Type() == typeOfPropertyList { |
| return multiArgTypeInvalid, nil |
| } |
| elemType = v.Type().Elem() |
| if reflect.PtrTo(elemType).Implements(typeOfPropertyLoadSaver) { |
| return multiArgTypePropertyLoadSaver, elemType |
| } |
| switch elemType.Kind() { |
| case reflect.Struct: |
| return multiArgTypeStruct, elemType |
| case reflect.Interface: |
| return multiArgTypeInterface, elemType |
| case reflect.Ptr: |
| elemType = elemType.Elem() |
| if elemType.Kind() == reflect.Struct { |
| return multiArgTypeStructPtr, elemType |
| } |
| } |
| return multiArgTypeInvalid, nil |
| } |
| |
| // Close closes the Client. |
| func (c *Client) Close() error { |
| return c.conn.Close() |
| } |
| |
| // Get loads the entity stored for key into dst, which must be a struct pointer |
| // or implement PropertyLoadSaver. If there is no such entity for the key, Get |
| // returns ErrNoSuchEntity. |
| // |
| // The values of dst's unmatched struct fields are not modified, and matching |
| // slice-typed fields are not reset before appending to them. In particular, it |
| // is recommended to pass a pointer to a zero valued struct on each Get call. |
| // |
| // ErrFieldMismatch is returned when a field is to be loaded into a different |
| // type than the one it was stored from, or when a field is missing or |
| // unexported in the destination struct. ErrFieldMismatch is only returned if |
| // dst is a struct pointer. |
| func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error) { |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Get") |
| defer func() { trace.EndSpan(ctx, err) }() |
| |
| if dst == nil { // get catches nil interfaces; we need to catch nil ptr here |
| return ErrInvalidEntityType |
| } |
| err = c.get(ctx, []*Key{key}, []interface{}{dst}, nil) |
| if me, ok := err.(MultiError); ok { |
| return me[0] |
| } |
| return err |
| } |
| |
| // GetMulti is a batch version of Get. |
| // |
| // dst must be a []S, []*S, []I or []P, for some struct type S, some interface |
| // type I, or some non-interface non-pointer type P such that P or *P |
| // implements PropertyLoadSaver. If an []I, each element must be a valid dst |
| // for Get: it must be a struct pointer or implement PropertyLoadSaver. |
| // |
| // As a special case, PropertyList is an invalid type for dst, even though a |
| // PropertyList is a slice of structs. It is treated as invalid to avoid being |
| // mistakenly passed when []PropertyList was intended. |
| func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (err error) { |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.GetMulti") |
| defer func() { trace.EndSpan(ctx, err) }() |
| |
| return c.get(ctx, keys, dst, nil) |
| } |
| |
| func (c *Client) get(ctx context.Context, keys []*Key, dst interface{}, opts *pb.ReadOptions) error { |
| v := reflect.ValueOf(dst) |
| multiArgType, _ := checkMultiArg(v) |
| |
| // Sanity checks |
| if multiArgType == multiArgTypeInvalid { |
| return errors.New("datastore: dst has invalid type") |
| } |
| if len(keys) != v.Len() { |
| return errors.New("datastore: keys and dst slices have different length") |
| } |
| if len(keys) == 0 { |
| return nil |
| } |
| |
| // Go through keys, validate them, serialize then, and create a dict mapping them to their indices. |
| // Equal keys are deduped. |
| multiErr, any := make(MultiError, len(keys)), false |
| keyMap := make(map[string][]int, len(keys)) |
| pbKeys := make([]*pb.Key, 0, len(keys)) |
| for i, k := range keys { |
| if !k.valid() { |
| multiErr[i] = ErrInvalidKey |
| any = true |
| } else if k.Incomplete() { |
| multiErr[i] = fmt.Errorf("datastore: can't get the incomplete key: %v", k) |
| any = true |
| } else { |
| ks := k.String() |
| if _, ok := keyMap[ks]; !ok { |
| pbKeys = append(pbKeys, keyToProto(k)) |
| } |
| keyMap[ks] = append(keyMap[ks], i) |
| } |
| } |
| if any { |
| return multiErr |
| } |
| req := &pb.LookupRequest{ |
| ProjectId: c.dataset, |
| Keys: pbKeys, |
| ReadOptions: opts, |
| } |
| resp, err := c.client.Lookup(ctx, req) |
| if err != nil { |
| return err |
| } |
| found := resp.Found |
| missing := resp.Missing |
| // Upper bound 100 iterations to prevent infinite loop. |
| // We choose 100 iterations somewhat logically: |
| // Max number of Entities you can request from Datastore is 1,000. |
| // Max size for a Datastore Entity is 1 MiB. |
| // Max request size is 10 MiB, so we assume max response size is also 10 MiB. |
| // 1,000 / 10 = 100. |
| // Note that if ctx has a deadline, the deadline will probably |
| // be hit before we reach 100 iterations. |
| for i := 0; len(resp.Deferred) > 0 && i < 100; i++ { |
| req.Keys = resp.Deferred |
| resp, err = c.client.Lookup(ctx, req) |
| if err != nil { |
| return err |
| } |
| found = append(found, resp.Found...) |
| missing = append(missing, resp.Missing...) |
| } |
| |
| filled := 0 |
| for _, e := range found { |
| k, err := protoToKey(e.Entity.Key) |
| if err != nil { |
| return errors.New("datastore: internal error: server returned an invalid key") |
| } |
| filled += len(keyMap[k.String()]) |
| for _, index := range keyMap[k.String()] { |
| elem := v.Index(index) |
| if multiArgType == multiArgTypePropertyLoadSaver || multiArgType == multiArgTypeStruct { |
| elem = elem.Addr() |
| } |
| if multiArgType == multiArgTypeStructPtr && elem.IsNil() { |
| elem.Set(reflect.New(elem.Type().Elem())) |
| } |
| if err := loadEntityProto(elem.Interface(), e.Entity); err != nil { |
| multiErr[index] = err |
| any = true |
| } |
| } |
| } |
| for _, e := range missing { |
| k, err := protoToKey(e.Entity.Key) |
| if err != nil { |
| return errors.New("datastore: internal error: server returned an invalid key") |
| } |
| filled += len(keyMap[k.String()]) |
| for _, index := range keyMap[k.String()] { |
| multiErr[index] = ErrNoSuchEntity |
| } |
| any = true |
| } |
| |
| if filled != len(keys) { |
| return errors.New("datastore: internal error: server returned the wrong number of entities") |
| } |
| |
| if any { |
| return multiErr |
| } |
| return nil |
| } |
| |
| // Put saves the entity src into the datastore with the given key. src must be |
| // a struct pointer or implement PropertyLoadSaver; if the struct pointer has |
| // any unexported fields they will be skipped. If the key is incomplete, the |
| // returned key will be a unique key generated by the datastore. |
| func (c *Client) Put(ctx context.Context, key *Key, src interface{}) (*Key, error) { |
| k, err := c.PutMulti(ctx, []*Key{key}, []interface{}{src}) |
| if err != nil { |
| if me, ok := err.(MultiError); ok { |
| return nil, me[0] |
| } |
| return nil, err |
| } |
| return k[0], nil |
| } |
| |
| // PutMulti is a batch version of Put. |
| // |
| // src must satisfy the same conditions as the dst argument to GetMulti. |
| // TODO(jba): rewrite in terms of Mutate. |
| func (c *Client) PutMulti(ctx context.Context, keys []*Key, src interface{}) (ret []*Key, err error) { |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.PutMulti") |
| defer func() { trace.EndSpan(ctx, err) }() |
| |
| mutations, err := putMutations(keys, src) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Make the request. |
| req := &pb.CommitRequest{ |
| ProjectId: c.dataset, |
| Mutations: mutations, |
| Mode: pb.CommitRequest_NON_TRANSACTIONAL, |
| } |
| resp, err := c.client.Commit(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Copy any newly minted keys into the returned keys. |
| ret = make([]*Key, len(keys)) |
| for i, key := range keys { |
| if key.Incomplete() { |
| // This key is in the mutation results. |
| ret[i], err = protoToKey(resp.MutationResults[i].Key) |
| if err != nil { |
| return nil, errors.New("datastore: internal error: server returned an invalid key") |
| } |
| } else { |
| ret[i] = key |
| } |
| } |
| return ret, nil |
| } |
| |
| func putMutations(keys []*Key, src interface{}) ([]*pb.Mutation, error) { |
| v := reflect.ValueOf(src) |
| multiArgType, _ := checkMultiArg(v) |
| if multiArgType == multiArgTypeInvalid { |
| return nil, errors.New("datastore: src has invalid type") |
| } |
| if len(keys) != v.Len() { |
| return nil, errors.New("datastore: key and src slices have different length") |
| } |
| if len(keys) == 0 { |
| return nil, nil |
| } |
| if err := multiValid(keys); err != nil { |
| return nil, err |
| } |
| mutations := make([]*pb.Mutation, 0, len(keys)) |
| multiErr := make(MultiError, len(keys)) |
| hasErr := false |
| for i, k := range keys { |
| elem := v.Index(i) |
| // Two cases where we need to take the address: |
| // 1) multiArgTypePropertyLoadSaver => &elem implements PLS |
| // 2) multiArgTypeStruct => saveEntity needs *struct |
| if multiArgType == multiArgTypePropertyLoadSaver || multiArgType == multiArgTypeStruct { |
| elem = elem.Addr() |
| } |
| p, err := saveEntity(k, elem.Interface()) |
| if err != nil { |
| multiErr[i] = err |
| hasErr = true |
| } |
| var mut *pb.Mutation |
| if k.Incomplete() { |
| mut = &pb.Mutation{Operation: &pb.Mutation_Insert{Insert: p}} |
| } else { |
| mut = &pb.Mutation{Operation: &pb.Mutation_Upsert{Upsert: p}} |
| } |
| mutations = append(mutations, mut) |
| } |
| if hasErr { |
| return nil, multiErr |
| } |
| return mutations, nil |
| } |
| |
| // Delete deletes the entity for the given key. |
| func (c *Client) Delete(ctx context.Context, key *Key) error { |
| err := c.DeleteMulti(ctx, []*Key{key}) |
| if me, ok := err.(MultiError); ok { |
| return me[0] |
| } |
| return err |
| } |
| |
| // DeleteMulti is a batch version of Delete. |
| // TODO(jba): rewrite in terms of Mutate. |
| func (c *Client) DeleteMulti(ctx context.Context, keys []*Key) (err error) { |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.DeleteMulti") |
| defer func() { trace.EndSpan(ctx, err) }() |
| |
| mutations, err := deleteMutations(keys) |
| if err != nil { |
| return err |
| } |
| |
| req := &pb.CommitRequest{ |
| ProjectId: c.dataset, |
| Mutations: mutations, |
| Mode: pb.CommitRequest_NON_TRANSACTIONAL, |
| } |
| _, err = c.client.Commit(ctx, req) |
| return err |
| } |
| |
| func deleteMutations(keys []*Key) ([]*pb.Mutation, error) { |
| mutations := make([]*pb.Mutation, 0, len(keys)) |
| set := make(map[string]bool, len(keys)) |
| multiErr := make(MultiError, len(keys)) |
| hasErr := false |
| for i, k := range keys { |
| if !k.valid() { |
| multiErr[i] = ErrInvalidKey |
| hasErr = true |
| } else if k.Incomplete() { |
| multiErr[i] = fmt.Errorf("datastore: can't delete the incomplete key: %v", k) |
| hasErr = true |
| } else { |
| ks := k.String() |
| if !set[ks] { |
| mutations = append(mutations, &pb.Mutation{ |
| Operation: &pb.Mutation_Delete{Delete: keyToProto(k)}, |
| }) |
| } |
| set[ks] = true |
| } |
| } |
| if hasErr { |
| return nil, multiErr |
| } |
| return mutations, nil |
| } |
| |
| // Mutate applies one or more mutations atomically. |
| // It returns the keys of the argument Mutations, in the same order. |
| // |
| // If any of the mutations are invalid, Mutate returns a MultiError with the errors. |
| // Mutate returns a MultiError in this case even if there is only one Mutation. |
| func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) (ret []*Key, err error) { |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.Mutate") |
| defer func() { trace.EndSpan(ctx, err) }() |
| |
| pmuts, err := mutationProtos(muts) |
| if err != nil { |
| return nil, err |
| } |
| req := &pb.CommitRequest{ |
| ProjectId: c.dataset, |
| Mutations: pmuts, |
| Mode: pb.CommitRequest_NON_TRANSACTIONAL, |
| } |
| resp, err := c.client.Commit(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| // Copy any newly minted keys into the returned keys. |
| ret = make([]*Key, len(muts)) |
| for i, mut := range muts { |
| if mut.key.Incomplete() { |
| // This key is in the mutation results. |
| ret[i], err = protoToKey(resp.MutationResults[i].Key) |
| if err != nil { |
| return nil, errors.New("datastore: internal error: server returned an invalid key") |
| } |
| } else { |
| ret[i] = mut.key |
| } |
| } |
| return ret, nil |
| } |