| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package util |
| |
| import ( |
| "qpid.apache.org/amqp" |
| "sync" |
| ) |
| |
| // Use a buffered channel as a very simple queue. |
| type Queue chan amqp.Message |
| |
| // Put a message back on the queue, does not block. |
| func (q Queue) PutBack(m amqp.Message) { |
| select { |
| case q <- m: |
| default: |
| // Not an efficient implementation but ensures we don't block the caller. |
| go func() { q <- m }() |
| } |
| } |
| |
| // Concurrent-safe map of queues. |
| type Queues struct { |
| queueSize int |
| m map[string]Queue |
| lock sync.Mutex |
| } |
| |
| func MakeQueues(queueSize int) Queues { |
| return Queues{queueSize: queueSize, m: make(map[string]Queue)} |
| } |
| |
| // Create a queue if not found. |
| func (qs *Queues) Get(name string) Queue { |
| qs.lock.Lock() |
| defer qs.lock.Unlock() |
| q := qs.m[name] |
| if q == nil { |
| q = make(Queue, qs.queueSize) |
| qs.m[name] = q |
| } |
| return q |
| } |