| package zk |
| |
| import ( |
| "encoding/binary" |
| "errors" |
| "log" |
| "reflect" |
| "runtime" |
| "time" |
| ) |
| |
| var ( |
| ErrUnhandledFieldType = errors.New("zk: unhandled field type") |
| ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct") |
| ErrShortBuffer = errors.New("zk: buffer too small") |
| ) |
| |
| type defaultLogger struct{} |
| |
| func (defaultLogger) Printf(format string, a ...interface{}) { |
| log.Printf(format, a...) |
| } |
| |
| type ACL struct { |
| Perms int32 |
| Scheme string |
| ID string |
| } |
| |
| type Stat struct { |
| Czxid int64 // The zxid of the change that caused this znode to be created. |
| Mzxid int64 // The zxid of the change that last modified this znode. |
| Ctime int64 // The time in milliseconds from epoch when this znode was created. |
| Mtime int64 // The time in milliseconds from epoch when this znode was last modified. |
| Version int32 // The number of changes to the data of this znode. |
| Cversion int32 // The number of changes to the children of this znode. |
| Aversion int32 // The number of changes to the ACL of this znode. |
| EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero. |
| DataLength int32 // The length of the data field of this znode. |
| NumChildren int32 // The number of children of this znode. |
| Pzxid int64 // last modified children |
| } |
| |
| // ServerClient is the information for a single Zookeeper client and its session. |
| // This is used to parse/extract the output fo the `cons` command. |
| type ServerClient struct { |
| Queued int64 |
| Received int64 |
| Sent int64 |
| SessionID int64 |
| Lcxid int64 |
| Lzxid int64 |
| Timeout int32 |
| LastLatency int32 |
| MinLatency int32 |
| AvgLatency int32 |
| MaxLatency int32 |
| Established time.Time |
| LastResponse time.Time |
| Addr string |
| LastOperation string // maybe? |
| Error error |
| } |
| |
| // ServerClients is a struct for the FLWCons() function. It's used to provide |
| // the list of Clients. |
| // |
| // This is needed because FLWCons() takes multiple servers. |
| type ServerClients struct { |
| Clients []*ServerClient |
| Error error |
| } |
| |
| // ServerStats is the information pulled from the Zookeeper `stat` command. |
| type ServerStats struct { |
| Sent int64 |
| Received int64 |
| NodeCount int64 |
| MinLatency int64 |
| AvgLatency int64 |
| MaxLatency int64 |
| Connections int64 |
| Outstanding int64 |
| Epoch int32 |
| Counter int32 |
| BuildTime time.Time |
| Mode Mode |
| Version string |
| Error error |
| } |
| |
| type requestHeader struct { |
| Xid int32 |
| Opcode int32 |
| } |
| |
| type responseHeader struct { |
| Xid int32 |
| Zxid int64 |
| Err ErrCode |
| } |
| |
| type multiHeader struct { |
| Type int32 |
| Done bool |
| Err ErrCode |
| } |
| |
| type auth struct { |
| Type int32 |
| Scheme string |
| Auth []byte |
| } |
| |
| // Generic request structs |
| |
| type pathRequest struct { |
| Path string |
| } |
| |
| type PathVersionRequest struct { |
| Path string |
| Version int32 |
| } |
| |
| type pathWatchRequest struct { |
| Path string |
| Watch bool |
| } |
| |
| type pathResponse struct { |
| Path string |
| } |
| |
| type statResponse struct { |
| Stat Stat |
| } |
| |
| // |
| |
| type CheckVersionRequest PathVersionRequest |
| type closeRequest struct{} |
| type closeResponse struct{} |
| |
| type connectRequest struct { |
| ProtocolVersion int32 |
| LastZxidSeen int64 |
| TimeOut int32 |
| SessionID int64 |
| Passwd []byte |
| } |
| |
| type connectResponse struct { |
| ProtocolVersion int32 |
| TimeOut int32 |
| SessionID int64 |
| Passwd []byte |
| } |
| |
| type CreateRequest struct { |
| Path string |
| Data []byte |
| Acl []ACL |
| Flags int32 |
| } |
| |
| type createResponse pathResponse |
| type DeleteRequest PathVersionRequest |
| type deleteResponse struct{} |
| |
| type errorResponse struct { |
| Err int32 |
| } |
| |
| type existsRequest pathWatchRequest |
| type existsResponse statResponse |
| type getAclRequest pathRequest |
| |
| type getAclResponse struct { |
| Acl []ACL |
| Stat Stat |
| } |
| |
| type getChildrenRequest pathRequest |
| |
| type getChildrenResponse struct { |
| Children []string |
| } |
| |
| type getChildren2Request pathWatchRequest |
| |
| type getChildren2Response struct { |
| Children []string |
| Stat Stat |
| } |
| |
| type getDataRequest pathWatchRequest |
| |
| type getDataResponse struct { |
| Data []byte |
| Stat Stat |
| } |
| |
| type getMaxChildrenRequest pathRequest |
| |
| type getMaxChildrenResponse struct { |
| Max int32 |
| } |
| |
| type getSaslRequest struct { |
| Token []byte |
| } |
| |
| type pingRequest struct{} |
| type pingResponse struct{} |
| |
| type setAclRequest struct { |
| Path string |
| Acl []ACL |
| Version int32 |
| } |
| |
| type setAclResponse statResponse |
| |
| type SetDataRequest struct { |
| Path string |
| Data []byte |
| Version int32 |
| } |
| |
| type setDataResponse statResponse |
| |
| type setMaxChildren struct { |
| Path string |
| Max int32 |
| } |
| |
| type setSaslRequest struct { |
| Token string |
| } |
| |
| type setSaslResponse struct { |
| Token string |
| } |
| |
| type setWatchesRequest struct { |
| RelativeZxid int64 |
| DataWatches []string |
| ExistWatches []string |
| ChildWatches []string |
| } |
| |
| type setWatchesResponse struct{} |
| |
| type syncRequest pathRequest |
| type syncResponse pathResponse |
| |
| type setAuthRequest auth |
| type setAuthResponse struct{} |
| |
| type multiRequestOp struct { |
| Header multiHeader |
| Op interface{} |
| } |
| type multiRequest struct { |
| Ops []multiRequestOp |
| DoneHeader multiHeader |
| } |
| type multiResponseOp struct { |
| Header multiHeader |
| String string |
| Stat *Stat |
| Err ErrCode |
| } |
| type multiResponse struct { |
| Ops []multiResponseOp |
| DoneHeader multiHeader |
| } |
| |
| func (r *multiRequest) Encode(buf []byte) (int, error) { |
| total := 0 |
| for _, op := range r.Ops { |
| op.Header.Done = false |
| n, err := encodePacketValue(buf[total:], reflect.ValueOf(op)) |
| if err != nil { |
| return total, err |
| } |
| total += n |
| } |
| r.DoneHeader.Done = true |
| n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader)) |
| if err != nil { |
| return total, err |
| } |
| total += n |
| |
| return total, nil |
| } |
| |
| func (r *multiRequest) Decode(buf []byte) (int, error) { |
| r.Ops = make([]multiRequestOp, 0) |
| r.DoneHeader = multiHeader{-1, true, -1} |
| total := 0 |
| for { |
| header := &multiHeader{} |
| n, err := decodePacketValue(buf[total:], reflect.ValueOf(header)) |
| if err != nil { |
| return total, err |
| } |
| total += n |
| if header.Done { |
| r.DoneHeader = *header |
| break |
| } |
| |
| req := requestStructForOp(header.Type) |
| if req == nil { |
| return total, ErrAPIError |
| } |
| n, err = decodePacketValue(buf[total:], reflect.ValueOf(req)) |
| if err != nil { |
| return total, err |
| } |
| total += n |
| r.Ops = append(r.Ops, multiRequestOp{*header, req}) |
| } |
| return total, nil |
| } |
| |
| func (r *multiResponse) Decode(buf []byte) (int, error) { |
| var multiErr error |
| |
| r.Ops = make([]multiResponseOp, 0) |
| r.DoneHeader = multiHeader{-1, true, -1} |
| total := 0 |
| for { |
| header := &multiHeader{} |
| n, err := decodePacketValue(buf[total:], reflect.ValueOf(header)) |
| if err != nil { |
| return total, err |
| } |
| total += n |
| if header.Done { |
| r.DoneHeader = *header |
| break |
| } |
| |
| res := multiResponseOp{Header: *header} |
| var w reflect.Value |
| switch header.Type { |
| default: |
| return total, ErrAPIError |
| case opError: |
| w = reflect.ValueOf(&res.Err) |
| case opCreate: |
| w = reflect.ValueOf(&res.String) |
| case opSetData: |
| res.Stat = new(Stat) |
| w = reflect.ValueOf(res.Stat) |
| case opCheck, opDelete: |
| } |
| if w.IsValid() { |
| n, err := decodePacketValue(buf[total:], w) |
| if err != nil { |
| return total, err |
| } |
| total += n |
| } |
| r.Ops = append(r.Ops, res) |
| if multiErr == nil && res.Err != errOk { |
| // Use the first error as the error returned from Multi(). |
| multiErr = res.Err.toError() |
| } |
| } |
| return total, multiErr |
| } |
| |
| type watcherEvent struct { |
| Type EventType |
| State State |
| Path string |
| } |
| |
| type decoder interface { |
| Decode(buf []byte) (int, error) |
| } |
| |
| type encoder interface { |
| Encode(buf []byte) (int, error) |
| } |
| |
| func decodePacket(buf []byte, st interface{}) (n int, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" { |
| err = ErrShortBuffer |
| } else { |
| panic(r) |
| } |
| } |
| }() |
| |
| v := reflect.ValueOf(st) |
| if v.Kind() != reflect.Ptr || v.IsNil() { |
| return 0, ErrPtrExpected |
| } |
| return decodePacketValue(buf, v) |
| } |
| |
| func decodePacketValue(buf []byte, v reflect.Value) (int, error) { |
| rv := v |
| kind := v.Kind() |
| if kind == reflect.Ptr { |
| if v.IsNil() { |
| v.Set(reflect.New(v.Type().Elem())) |
| } |
| v = v.Elem() |
| kind = v.Kind() |
| } |
| |
| n := 0 |
| switch kind { |
| default: |
| return n, ErrUnhandledFieldType |
| case reflect.Struct: |
| if de, ok := rv.Interface().(decoder); ok { |
| return de.Decode(buf) |
| } else if de, ok := v.Interface().(decoder); ok { |
| return de.Decode(buf) |
| } else { |
| for i := 0; i < v.NumField(); i++ { |
| field := v.Field(i) |
| n2, err := decodePacketValue(buf[n:], field) |
| n += n2 |
| if err != nil { |
| return n, err |
| } |
| } |
| } |
| case reflect.Bool: |
| v.SetBool(buf[n] != 0) |
| n++ |
| case reflect.Int32: |
| v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4]))) |
| n += 4 |
| case reflect.Int64: |
| v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8]))) |
| n += 8 |
| case reflect.String: |
| ln := int(binary.BigEndian.Uint32(buf[n : n+4])) |
| v.SetString(string(buf[n+4 : n+4+ln])) |
| n += 4 + ln |
| case reflect.Slice: |
| switch v.Type().Elem().Kind() { |
| default: |
| count := int(binary.BigEndian.Uint32(buf[n : n+4])) |
| n += 4 |
| values := reflect.MakeSlice(v.Type(), count, count) |
| v.Set(values) |
| for i := 0; i < count; i++ { |
| n2, err := decodePacketValue(buf[n:], values.Index(i)) |
| n += n2 |
| if err != nil { |
| return n, err |
| } |
| } |
| case reflect.Uint8: |
| ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4]))) |
| if ln < 0 { |
| n += 4 |
| v.SetBytes(nil) |
| } else { |
| bytes := make([]byte, ln) |
| copy(bytes, buf[n+4:n+4+ln]) |
| v.SetBytes(bytes) |
| n += 4 + ln |
| } |
| } |
| } |
| return n, nil |
| } |
| |
| func encodePacket(buf []byte, st interface{}) (n int, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" { |
| err = ErrShortBuffer |
| } else { |
| panic(r) |
| } |
| } |
| }() |
| |
| v := reflect.ValueOf(st) |
| if v.Kind() != reflect.Ptr || v.IsNil() { |
| return 0, ErrPtrExpected |
| } |
| return encodePacketValue(buf, v) |
| } |
| |
| func encodePacketValue(buf []byte, v reflect.Value) (int, error) { |
| rv := v |
| for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface { |
| v = v.Elem() |
| } |
| |
| n := 0 |
| switch v.Kind() { |
| default: |
| return n, ErrUnhandledFieldType |
| case reflect.Struct: |
| if en, ok := rv.Interface().(encoder); ok { |
| return en.Encode(buf) |
| } else if en, ok := v.Interface().(encoder); ok { |
| return en.Encode(buf) |
| } else { |
| for i := 0; i < v.NumField(); i++ { |
| field := v.Field(i) |
| n2, err := encodePacketValue(buf[n:], field) |
| n += n2 |
| if err != nil { |
| return n, err |
| } |
| } |
| } |
| case reflect.Bool: |
| if v.Bool() { |
| buf[n] = 1 |
| } else { |
| buf[n] = 0 |
| } |
| n++ |
| case reflect.Int32: |
| binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int())) |
| n += 4 |
| case reflect.Int64: |
| binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int())) |
| n += 8 |
| case reflect.String: |
| str := v.String() |
| binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str))) |
| copy(buf[n+4:n+4+len(str)], []byte(str)) |
| n += 4 + len(str) |
| case reflect.Slice: |
| switch v.Type().Elem().Kind() { |
| default: |
| count := v.Len() |
| startN := n |
| n += 4 |
| for i := 0; i < count; i++ { |
| n2, err := encodePacketValue(buf[n:], v.Index(i)) |
| n += n2 |
| if err != nil { |
| return n, err |
| } |
| } |
| binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count)) |
| case reflect.Uint8: |
| if v.IsNil() { |
| binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff)) |
| n += 4 |
| } else { |
| bytes := v.Bytes() |
| binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes))) |
| copy(buf[n+4:n+4+len(bytes)], bytes) |
| n += 4 + len(bytes) |
| } |
| } |
| } |
| return n, nil |
| } |
| |
| func requestStructForOp(op int32) interface{} { |
| switch op { |
| case opClose: |
| return &closeRequest{} |
| case opCreate: |
| return &CreateRequest{} |
| case opDelete: |
| return &DeleteRequest{} |
| case opExists: |
| return &existsRequest{} |
| case opGetAcl: |
| return &getAclRequest{} |
| case opGetChildren: |
| return &getChildrenRequest{} |
| case opGetChildren2: |
| return &getChildren2Request{} |
| case opGetData: |
| return &getDataRequest{} |
| case opPing: |
| return &pingRequest{} |
| case opSetAcl: |
| return &setAclRequest{} |
| case opSetData: |
| return &SetDataRequest{} |
| case opSetWatches: |
| return &setWatchesRequest{} |
| case opSync: |
| return &syncRequest{} |
| case opSetAuth: |
| return &setAuthRequest{} |
| case opCheck: |
| return &CheckVersionRequest{} |
| case opMulti: |
| return &multiRequest{} |
| } |
| return nil |
| } |