blob: 9b02662bd6d1566558b17e256e3155886acf1942 [file] [log] [blame]
#!/usr/bin/env ruby
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
require 'cproton'
require 'optparse'
FAILED = 0
CONNECTION_UP = 1
AUTHENTICATING = 2
$options = {
:verbose => false,
:hostname => "0.0.0.0",
:port => "5672"
}
OptionParser.new do |opts|
opts.banner = "Usage: mailserver [options] <server-address> [<server-port>]"
opts.on("-v", "--verbose", :NONE,
"Print status messages to stdout") do |f|
$options[:verbose] = true
end
opts.parse!
$options[:hostname] = ARGV[0] if ARGV.length > 0
$options[:port] = ARGV[1] if ARGV.length > 1
end
def log(text)
STDOUT.puts "#{Time.new}: #{text}" if $options[:verbose]
end
class MailServer
def initialize(hostname, port, verbose)
@hostname = hostname
@port = port
@verbose = verbose
@counter = 0
@mailboxes = {}
@driver = nil
end
def setup
@driver = Cproton::pn_driver
@listener = Cproton::pn_listener @driver, @hostname, @port, nil
raise "Error: could not listen on #{@hostname}:#{@port}" if @listener.nil?
end
def wait
log "Driver sleep."
Cproton::pn_driver_wait @driver, -1
log "Driver wakeup."
end
def accept_connection_requests
l = Cproton::pn_driver_listener @driver
while !l.nil?
log "Accepting connection."
cxtr = Cproton::pn_listener_accept l
Cproton::pn_connector_set_context cxtr, AUTHENTICATING
l = Cproton::pn_driver_listener @driver
end
end
def process_connections
cxtr = Cproton::pn_driver_connector @driver
while cxtr
log "Process connector"
Cproton::pn_connector_process cxtr
state = Cproton::pn_connector_context cxtr
case state
when AUTHENTICATING
log "Authenticating..."
self.authenticate_connector cxtr
when CONNECTION_UP
log "Connection established..."
self.service_connector cxtr
else
raise "Unknown connection state #{state}"
end
Cproton::pn_connector_process cxtr
if Cproton::pn_connector_closed cxtr
log "Closing connector."
Cproton::pn_connector_free cxtr
end
cxtr = Cproton::pn_driver_connector @driver
end
end
def authenticate_connector(cxtr)
log "Authenticating..."
sasl = Cproton::pn_connector_sasl cxtr
state = Cproton::pn_sasl_state sasl
while [Cproton::PN_SASL_CONF, Cproton::PN_SASL_STEP].include? state
case state
when Cproton::PN_SASL_CONF
log "Authenticating-CONF..."
Cproton::pn_sasl_mechanisms sasl, "ANONYMOUS"
Cproton::pn_sasl_server sasl
when Cproton::PN_SASL_STEP
log "Authenticating-STEP..."
mech = Cproton::pn_sasl_remote_mechanisms sasl
if mech == "ANONYMOUS"
Cproton::pn_sasl_done sasl, Cproton::PN_SASL_OK
else
Cproton::pn_sasl_done sasl, Cproton::PN_SASL_AUTH
end
end
state = Cproton::pn_sasl_state sasl
end
case state
when Cproton::PN_SASL_PASS
Cproton::pn_connector_set_connection cxtr, Cproton::pn_connection
Cproton::pn_connector_set_context cxtr, CONNECTION_UP
log "Authentication-PASSED"
when Cproton::PN_SASL_FAIL
Cproton::pn_connector_set_context cxtr, FAILED
log "Authentication-FAILED"
else
log "Authentication-PENDING"
end
end
def service_connector(cxtr)
log "I/O processing starting."
conn = Cproton::pn_connector_connection cxtr
if ((Cproton::pn_connection_state(conn) & Cproton::PN_LOCAL_UNINIT) == Cproton::PN_LOCAL_UNINIT)
log "Connection opened."
Cproton::pn_connection_open(conn)
end
ssn = Cproton::pn_session_head conn, Cproton::PN_LOCAL_UNINIT
while ssn
Cproton::pn_session_open ssn
log "Session opened."
ssn = Cproton::pn_session_next ssn, Cproton::PN_LOCAL_UNINIT
end
link = Cproton::pn_link_head conn, Cproton::PN_LOCAL_UNINIT
while link
setup_link link
link = Cproton::pn_link_next link, Cproton::PN_LOCAL_UNINIT
end
delivery = Cproton::pn_work_head conn
while delivery
log "Process delivery #{Cproton::pn_delivery_tag delivery}"
if Cproton::pn_delivery_readable delivery
process_receive delivery
elsif Cproton::pn_delivery_writable delivery
send_message delivery
end
if Cproton::pn_delivery_updated delivery
log "Remote disposition for #{Cproton::pn_delivery_tag delivery}: #{Cproton::pn_delivery_remote_state delivery}"
Cproton::pn_delivery_settle(delivery) if Cproton::pn_delivery_remote_state delivery
end
delivery = Cproton::pn_work_next delivery
end
link = Cproton::pn_link_head conn, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
while link
Cproton::pn_link_close link
log "Link closed."
link = Cproton::pn_link_next link, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
end
ssn = Cproton::pn_session_head conn, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
while ssn
Cproton::pn_session_close ssn
log "Session closed."
ssn = Cproton::pn_session_next ssn, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
end
if Cproton::pn_connection_state(conn) == (Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED)
log "Connection closed."
Cproton::pn_connection_close conn
end
end
def process_receive(delivery)
link = Cproton::pn_delivery_link delivery
mbox = Cproton::pn_link_remote_target link
(rc, msg) = Cproton::pn_link_recv link, 1024
log "Message received #{rc}"
while rc >= 0
unless @mailboxes.include? mbox
log "Error: cannot send to mailbox #{mbox} - dropping message."
else
@mailboxes[mbox] << msg
log "Mailbox #{mbox} contains: #{@mailboxes[mbox].size}"
end
(rc, msg) = Cproton::pn_link_recv link, 1024
end
log "Messages accepted."
Cproton::pn_delivery_update delivery, Cproton::PN_ACCEPTED
Cproton::pn_delivery_settle delivery
Cproton::pn_link_advance link
Cproton::pn_link_flow(link, 1) if Cproton::pn_link_credit(link).zero?
end
def send_message(delivery)
link = Cproton::pn_delivery_link delivery
mbox = Cproton::pn_link_remote_source link
log "Request for Mailbox=#{mbox}"
if @mailboxes.include?(mbox) && !@mailboxes[mbox].empty?
msg = @mailboxes[mbox].first
@mailboxes[mbox].delete_at 0
log "Fetching message #{msg}"
else
log "Warning: mailbox #{mbox} is empty, sending empty message"
msg = ""
end
sent = Cproton::pn_link_send link, msg
log "Message sent: #{sent}"
if Cproton::pn_link_advance link
Cproton::pn_delivery link, "server-delivery-#{@counter}"
@counter += 1
end
end
def setup_link(link)
r_tgt = Cproton::pn_link_remote_target link
r_src = Cproton::pn_link_remote_source link
if Cproton::pn_link_is_sender link
log "Opening link to read from mailbox: #{r_src}"
unless @mailboxes.include? r_src
log "Error: mailbox #{r_src} does not exist!"
r_src = nil
end
else
log "Opening link to write to mailbox: #{r_tgt}"
@mailboxes[r_tgt] = [] unless @mailboxes.include? r_tgt
end
Cproton::pn_link_set_target link, r_tgt
Cproton::pn_link_set_source link, r_src
if Cproton::pn_link_is_sender link
Cproton::pn_delivery link, "server-delivery-#{@counter}"
@counter += 1
else
Cproton::pn_link_flow link, 1
end
Cproton::pn_link_open link
end
end
#------------------
# Begin entry point
#------------------
if __FILE__ == $PROGRAM_NAME
server = MailServer.new($options[:hostname],
$options[:port],
$options[:verbose])
server.setup
loop do
server.wait
server.accept_connection_requests
server.process_connections
end
end