| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package util |
| |
| import ( |
| "encoding/json" |
| "sync" |
| ) |
| |
| var SHARD_COUNT = 33 |
| |
| // A "thread" safe map of type string:Anything. |
| // To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. |
| type ConcurrentMap []*concurrentMapShared |
| |
| // A "thread" safe string to anything map. |
| type concurrentMapShared struct { |
| items map[string]interface{} |
| sync.RWMutex // Read Write mutex, guards access to internal map. |
| } |
| |
| // Creates a new concurrent map. |
| func New() ConcurrentMap { |
| m := make(ConcurrentMap, SHARD_COUNT) |
| for i := 0; i < SHARD_COUNT; i++ { |
| m[i] = &concurrentMapShared{items: make(map[string]interface{})} |
| } |
| return m |
| } |
| |
| // Returns shard under given key |
| func (m ConcurrentMap) GetShard(key string) *concurrentMapShared { |
| return m[uint(fnv32(key))%uint(SHARD_COUNT)] |
| } |
| |
| func (m ConcurrentMap) MSet(data map[string]interface{}) { |
| for key, value := range data { |
| shard := m.GetShard(key) |
| shard.Lock() |
| shard.items[key] = value |
| shard.Unlock() |
| } |
| } |
| |
| // Sets the given value under the specified key. |
| func (m *ConcurrentMap) Set(key string, value interface{}) { |
| // Get map shard. |
| shard := m.GetShard(key) |
| shard.Lock() |
| shard.items[key] = value |
| shard.Unlock() |
| } |
| |
| // Callback to return new element to be inserted into the map |
| // It is called while lock is held, therefore it MUST NOT |
| // try to access other keys in same map, as it can lead to deadlock since |
| // Go sync.RWLock is not reentrant |
| type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} |
| |
| // Insert or Update - updates existing element or inserts a new one using UpsertCb |
| func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { |
| shard := m.GetShard(key) |
| shard.Lock() |
| v, ok := shard.items[key] |
| res = cb(ok, v, value) |
| shard.items[key] = res |
| shard.Unlock() |
| return res |
| } |
| |
| // Sets the given value under the specified key if no value was associated with it. |
| func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { |
| // Get map shard. |
| shard := m.GetShard(key) |
| shard.Lock() |
| _, ok := shard.items[key] |
| if !ok { |
| shard.items[key] = value |
| } |
| shard.Unlock() |
| return !ok |
| } |
| |
| // Retrieves an element from map under given key. |
| func (m ConcurrentMap) Get(key string) (interface{}, bool) { |
| // Get shard |
| shard := m.GetShard(key) |
| shard.RLock() |
| // Get item from shard. |
| val, ok := shard.items[key] |
| shard.RUnlock() |
| return val, ok |
| } |
| |
| // Returns the number of elements within the map. |
| func (m ConcurrentMap) Count() int { |
| count := 0 |
| for i := 0; i < SHARD_COUNT; i++ { |
| shard := m[i] |
| shard.RLock() |
| count += len(shard.items) |
| shard.RUnlock() |
| } |
| return count |
| } |
| |
| // Looks up an item under specified key |
| func (m *ConcurrentMap) Has(key string) bool { |
| // Get shard |
| shard := m.GetShard(key) |
| shard.RLock() |
| // See if element is within shard. |
| _, ok := shard.items[key] |
| shard.RUnlock() |
| return ok |
| } |
| |
| // Removes an element from the map. |
| func (m *ConcurrentMap) Remove(key string) { |
| // Try to get shard. |
| shard := m.GetShard(key) |
| shard.Lock() |
| delete(shard.items, key) |
| shard.Unlock() |
| } |
| |
| // Removes an element from the map and returns it |
| func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) { |
| // Try to get shard. |
| shard := m.GetShard(key) |
| shard.Lock() |
| v, exists = shard.items[key] |
| delete(shard.items, key) |
| shard.Unlock() |
| return v, exists |
| } |
| |
| // Checks if map is empty. |
| func (m *ConcurrentMap) IsEmpty() bool { |
| return m.Count() == 0 |
| } |
| |
| // Used by the Iter & IterBuffered functions to wrap two variables together over a channel, |
| type Tuple struct { |
| Key string |
| Val interface{} |
| } |
| |
| // Returns an iterator which could be used in a for range loop. |
| // |
| // Deprecated: using IterBuffered() will get a better performence |
| func (m ConcurrentMap) Iter() <-chan Tuple { |
| ch := make(chan Tuple) |
| go func() { |
| wg := sync.WaitGroup{} |
| wg.Add(SHARD_COUNT) |
| // Foreach shard. |
| for _, shard := range m { |
| go func(shard *concurrentMapShared) { |
| // Foreach key, value pair. |
| shard.RLock() |
| for key, val := range shard.items { |
| ch <- Tuple{key, val} |
| } |
| shard.RUnlock() |
| wg.Done() |
| }(shard) |
| } |
| wg.Wait() |
| close(ch) |
| }() |
| return ch |
| } |
| |
| // Returns a buffered iterator which could be used in a for range loop. |
| func (m ConcurrentMap) IterBuffered() <-chan Tuple { |
| ch := make(chan Tuple, m.Count()) |
| go func() { |
| wg := sync.WaitGroup{} |
| wg.Add(SHARD_COUNT) |
| // Foreach shard. |
| for _, shard := range m { |
| go func(shard *concurrentMapShared) { |
| // Foreach key, value pair. |
| shard.RLock() |
| for key, val := range shard.items { |
| ch <- Tuple{key, val} |
| } |
| shard.RUnlock() |
| wg.Done() |
| }(shard) |
| } |
| wg.Wait() |
| close(ch) |
| }() |
| return ch |
| } |
| |
| // Returns all items as map[string]interface{} |
| func (m ConcurrentMap) Items() map[string]interface{} { |
| tmp := make(map[string]interface{}) |
| |
| // Insert items to temporary map. |
| for item := range m.IterBuffered() { |
| tmp[item.Key] = item.Val |
| } |
| |
| return tmp |
| } |
| |
| // Iterator callback,called for every key,value found in |
| // maps. RLock is held for all calls for a given shard |
| // therefore callback sess consistent view of a shard, |
| // but not across the shards |
| type IterCb func(key string, v interface{}) |
| |
| // Callback based iterator, cheapest way to read |
| // all elements in a map. |
| func (m *ConcurrentMap) IterCb(fn IterCb) { |
| for idx := range *m { |
| shard := (*m)[idx] |
| shard.RLock() |
| for key, value := range shard.items { |
| fn(key, value) |
| } |
| shard.RUnlock() |
| } |
| } |
| |
| // Return all keys as []string |
| func (m ConcurrentMap) Keys() []string { |
| count := m.Count() |
| ch := make(chan string, count) |
| go func() { |
| // Foreach shard. |
| wg := sync.WaitGroup{} |
| wg.Add(SHARD_COUNT) |
| for _, shard := range m { |
| go func(shard *concurrentMapShared) { |
| // Foreach key, value pair. |
| shard.RLock() |
| for key := range shard.items { |
| ch <- key |
| } |
| shard.RUnlock() |
| wg.Done() |
| }(shard) |
| } |
| wg.Wait() |
| close(ch) |
| }() |
| |
| keys := make([]string, 0, count) |
| for k := range ch { |
| keys = append(keys, k) |
| } |
| return keys |
| } |
| |
| //Reviles ConcurrentMap "private" variables to json marshal. |
| func (m ConcurrentMap) MarshalJSON() ([]byte, error) { |
| // Create a temporary map, which will hold all item spread across shards. |
| tmp := make(map[string]interface{}) |
| |
| // Insert items to temporary map. |
| for item := range m.IterBuffered() { |
| tmp[item.Key] = item.Val |
| } |
| return json.Marshal(tmp) |
| } |
| |
| func fnv32(key string) uint32 { |
| hash := uint32(2166136261) |
| const prime32 = uint32(16777619) |
| for i := 0; i < len(key); i++ { |
| hash *= prime32 |
| hash ^= uint32(key[i]) |
| } |
| return hash |
| } |