blob: 32c17b631b5496826f911d4a7c30800d12ff7078 [file] [log] [blame]
require "rubygems"
require "json"
module Storm
module Protocol
def read_string_message
msg = ""
loop do
line = STDIN.readline.chomp
break if line == "end"
msg << line
msg << "\n"
end
msg.chomp
end
def read_message
JSON.parse read_string_message
end
def send_to_parent(s)
puts s
puts "end"
STDOUT.flush
end
def sync
puts "sync"
STDOUT.flush
end
def send_pid(heartbeat_dir)
pid = Process.pid
puts pid
STDOUT.flush
File.open("#{heartbeat_dir}/#{pid}", "w").close
end
def send_msg_to_parent(hash)
send_to_parent hash.to_json
end
def emit_tuple(tup, stream = nil, anchors = [], direct = nil)
m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
m[:stream] = stream if stream
m[:task] = direct if direct
send_msg_to_parent m
end
def emit(tup, stream = nil, anchors = [], direct = nil)
emit_tuple tup, stream, anchors, direct
read_message
end
def ack(tup)
send_msg_to_parent :command => :ack, :id => tup.id
end
def fail(tup)
send_msg_to_parent :command => :fail, :id => tup.id
end
def log(msg)
send_msg_to_parent :command => :log, :msg => msg.to_s
end
def read_env
[read_message, read_message]
end
end
class Tuple
attr_accessor :id, :component, :stream, :task, :values
def initialize(id, component, stream, task, values)
@id = id
@component = component
@stream = stream
@task = task
@values = values
end
def self.from_hash(hash)
Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
end
end
class Bolt
include Storm::Protocol
def initialize(conf = nil, context = nil)
@conf = conf
@context = context
end
def process(tuple)
end
def run
heartbeat_dir = read_string_message
send_pid heartbeat_dir
@conf, @context = read_env
begin
while true
process Tuple.from_hash(read_message)
sync
end
rescue Exception => e
log e
end
end
end
class Spout
end
end