| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| require "thread" |
| require "qpid/peer" |
| require "qpid/queue" |
| |
| module Qpid08 |
| |
| class Client |
| def initialize(host, port, spec, vhost = "/") |
| @host = host |
| @port = port |
| @spec = spec |
| @vhost = vhost |
| |
| @mechanism = nil |
| @response = nil |
| @locale = nil |
| |
| @queues = {} |
| @mutex = Mutex.new() |
| |
| @closed = false |
| @code = nil |
| @started = ConditionVariable.new() |
| |
| @conn = Connection.new(@host, @port, @spec) |
| @peer = Peer.new(@conn, ClientDelegate.new(self)) |
| end |
| |
| attr_reader :mechanism, :response, :locale |
| |
| def closed?; @closed end |
| def closed=(value); @closed = value end |
| def code; @code end |
| |
| def wait() |
| @mutex.synchronize do |
| @started.wait(@mutex) |
| end |
| raise EOFError.new() if closed? |
| end |
| |
| def signal_start() |
| @started.broadcast() |
| end |
| |
| def queue(key) |
| @mutex.synchronize do |
| q = @queues[key] |
| if q.nil? |
| q = Queue.new() |
| @queues[key] = q |
| end |
| return q |
| end |
| end |
| |
| def start(response, mechanism="AMQPLAIN", locale="en_US") |
| @response = response |
| @mechanism = mechanism |
| @locale = locale |
| |
| @conn.connect() |
| @conn.init() |
| @peer.start() |
| wait() |
| channel(0).connection_open(@vhost) |
| end |
| |
| def channel(id) |
| return @peer.channel(id) |
| end |
| |
| def close(msg = nil) |
| @closed = true |
| @code = msg |
| @peer.close() |
| end |
| end |
| |
| class ClientDelegate |
| |
| include Delegate |
| |
| def initialize(client) |
| @client = client |
| end |
| |
| def connection_start(ch, msg) |
| ch.connection_start_ok(:mechanism => @client.mechanism, |
| :response => @client.response, |
| :locale => @client.locale) |
| end |
| |
| def connection_tune(ch, msg) |
| ch.connection_tune_ok(*msg.fields) |
| @client.signal_start() |
| end |
| |
| def connection_close(ch, msg) |
| puts "CONNECTION CLOSED: #{msg.args.join(", ")}" |
| @client.close(msg) |
| end |
| |
| def channel_close(ch, msg) |
| puts "CHANNEL[#{ch.id}] CLOSED: #{msg.args.join(", ")}" |
| ch.channel_close_ok() |
| ch.close() |
| end |
| |
| def basic_deliver(ch, msg) |
| queue = @client.queue(msg.consumer_tag) |
| queue << msg |
| end |
| |
| end |
| |
| end |