blob: 09a4888cc4223da076c656086bfbf2bccf357737 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
require "socket"
require "qpid/codec08"
module Qpid08
class Connection
def initialize(host, port, spec)
@host = host
@port = port
@spec = spec
end
attr_reader(:host, :port, :spec)
def connect()
@sock = TCPSocket.open(@host, @port)
@out = Encoder.new(@sock)
@in = Decoder.new(@sock)
end
def init()
@out.write("AMQP")
[1, 1, @spec.major, @spec.minor].each {|o|
@out.octet(o)
}
end
def write(frame)
# puts "OUT #{frame.inspect()}"
@out.octet(@spec.constants[frame.payload.type].id)
@out.short(frame.channel)
frame.payload.encode(@out)
@out.octet(frame_end)
end
def read()
type = @spec.constants[@in.octet()].name
channel = @in.short()
payload = Payload.decode(type, @spec, @in)
oct = @in.octet()
if oct != frame_end
raise Exception.new("framing error: expected #{frame_end}, got #{oct}")
end
frame = Frame.new(channel, payload)
# puts " IN #{frame.inspect}"
return frame
end
private
def frame_end
@spec.constants[:"frame_end"].id
end
end
class Frame
def initialize(channel, payload)
@channel = channel
@payload = payload
end
attr_reader(:channel, :payload)
end
class Payload
TYPES = {}
def Payload.singleton_method_added(name)
if name == :type
TYPES[type] = self
end
end
def Payload.decode(type, spec, dec)
klass = TYPES[type]
klass.decode(spec, dec)
end
end
class Method < Payload
def initialize(method, args)
if args.size != method.fields.size
raise ArgumentError.new("argument mismatch #{method} #{args}")
end
@method = method
@args = args
end
attr_reader(:method, :args)
def Method.type; :frame_method end
def type; Method.type end
def encode(encoder)
buf = StringWriter.new()
enc = Encoder.new(buf)
enc.short(@method.parent.id)
enc.short(@method.id)
@method.fields.zip(self.args).each {|f, a|
if a.nil?; a = f.default end
enc.encode(f.type, a)
}
enc.flush()
encoder.longstr(buf.to_s)
end
def Method.decode(spec, decoder)
buf = decoder.longstr()
dec = Decoder.new(StringReader.new(buf))
klass = spec.classes[dec.short()]
meth = klass.methods[dec.short()]
args = meth.fields.map {|f| dec.decode(f.type)}
return Method.new(meth, args)
end
def inspect(); "#{method.qname}(#{args.join(", ")})" end
end
class Header < Payload
def Header.type; :frame_header end
def initialize(klass, weight, size, properties)
@klass = klass
@weight = weight
@size = size
@properties = properties
end
attr_reader :weight, :size, :properties
def type; Header.type end
def encode(encoder)
buf = StringWriter.new()
enc = Encoder.new(buf)
enc.short(@klass.id)
enc.short(@weight)
enc.longlong(@size)
# property flags
nprops = @klass.fields.size
flags = 0
0.upto(nprops - 1) do |i|
f = @klass.fields[i]
flags <<= 1
flags |= 1 unless @properties[f.name].nil?
# the last bit indicates more flags
if i > 0 and (i % 15) == 0
flags <<= 1
if nprops > (i + 1)
flags |= 1
enc.short(flags)
flags = 0
end
end
end
flags <<= ((16 - (nprops % 15)) % 16)
enc.short(flags)
# properties
@klass.fields.each do |f|
v = @properties[f.name]
enc.encode(f.type, v) unless v.nil?
end
enc.flush()
encoder.longstr(buf.to_s)
end
def Header.decode(spec, decoder)
dec = Decoder.new(StringReader.new(decoder.longstr()))
klass = spec.classes[dec.short()]
weight = dec.short()
size = dec.longlong()
# property flags
bits = []
while true
flags = dec.short()
15.downto(1) do |i|
if flags >> i & 0x1 != 0
bits << true
else
bits << false
end
end
break if flags & 0x1 == 0
end
# properties
properties = {}
bits.zip(klass.fields).each do |b, f|
properties[f.name] = dec.decode(f.type) if b
end
return Header.new(klass, weight, size, properties)
end
def inspect(); "#{@klass.name}(#{@properties.inspect()})" end
end
class Body < Payload
def Body.type; :frame_body end
def type; Body.type end
def initialize(content)
@content = content
end
attr_reader :content
def encode(enc)
enc.longstr(@content)
end
def Body.decode(spec, dec)
return Body.new(dec.longstr())
end
end
end