blob: 93f3b06f812071bfc32f750275bc648c478ef4be [file] [log] [blame]
#
# Maintain two lists of active sockets (channels): one associating a user
# with a list of sockets, and one associating each socket to a single user.
#
require 'json'
require 'concurrent'
require 'listen'
require 'digest'
require 'yaml'
require_relative './session'
require_relative './events'
require 'whimsy/asf/svn'
class Channel
@@sockets = Concurrent::Map.new
@@users = Concurrent::Map.new {|map,key| map[key]=[]}
begin
FOUNDATION_BOARD = ASF::SVN['foundation_board']
rescue Exception
# rescue value is to help with startup when initialising a new host
FOUNDATION_BOARD = '/srv/svn/foundation_board'
end
# add a new socket/userid pair
def self.add(ws, id)
@@users[id] << ws
@@sockets[ws] = id
if @@users[id].length == 1
self.post_all(type: :arrive, user: id, present: self.present,
timestamp: Time.now.to_f*1000)
end
end
# send a message to a list of clients
def self.post(clients, msg)
clients.each do |client|
EM.defer(
->() {client.send msg},
->(response) {},
->(error) {client.close rescue nil}
)
end
end
# send a message to all users
def self.post_all(msg)
msg = JSON.dump(msg) if msg.instance_of? Hash
self.post @@sockets.keys, msg
end
# send a message to a specific user
def self.post_private(user, msg)
msg = JSON.dump(msg) if msg.instance_of? Hash
self.post @@users[user] || [], msg
end
# delete a socket connection
def self.delete(ws)
id = @@sockets.delete(ws)
if id
@@users[id].delete ws
if @@users[id].empty?
@@users.delete id
self.post_all(type: :depart, user: id, present: self.present,
timestamp: Time.now.to_f*1000)
end
end
end
# return a list of active users
def self.present
users = @@users.keys
path = File.join(Session::WORKDIR, 'present.yml')
File.open(path, File::RDWR|File::CREAT, 0644) do |fh|
fh.flock(File::LOCK_EX)
fh.write(YAML.dump(users))
fh.flush
fh.truncate(fh.pos)
end
users.reject! {|name| name =~ /^board_agenda_[_\d]+$/}
end
# close all open sockets
def self.close_all
@@sockets.keys.each do |client|
client.close
end
end
# listen for changes to agenda files
board_listener = Listen.to(FOUNDATION_BOARD) do |modified, added, removed|
modified.each do |path|
next unless File.exist?(path)
file = File.basename(path)
if file =~ /^board_agenda_[\d_]+.txt$/
contents = File.read(path)
digest = Digest::SHA256.base64digest(contents)
self.post_all type: :agenda, file: file, digest: digest
end
end
end
board_listener.start
# listen for changes to pending and minutes files
work_listener = Listen.to(Session::AGENDA_WORK) do |modified, added, removed|
(modified+added).each do |path|
next if path.end_with? '/sessions/present.yml'
next unless File.exist?(path)
file = File.basename(path)
if path =~ /board_agenda_\d+_\d+_\d+.txt$/
contents = File.read(path)
digest = Digest::SHA256.base64digest(contents)
self.post_all type: :agenda, file: file, digest: digest
elsif file =~ /^board_minutes_\d{4}_\d\d_\d\d\.yml$/
agenda = file.sub('minutes', 'agenda').sub('.yml', '.txt')
self.post_all type: :minutes, agenda: agenda,
value: YAML.load_file(path)
elsif file =~ /^(\w+)\.yml$/
self.post_private $1, type: :pending, private: $1,
value: YAML.load_file(path)
elsif path =~ /\/events\/\w+$/
Events.process()
elsif file =~ /^(\w+)\.bak$/
nil
elsif path =~ /^\/sessions\/\w+$/
nil
elsif file =~ /^board_agenda_\d{4}_\d\d_\d\d\-chat.yml$/
nil
else
STDERR.puts path
end
end
end
work_listener.start
# process pending messages
Events.process()
end