blob: cccce7ae7497f56c086d063e4b841dbd3f7826c3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Skywalking
module Reporter
class Scheduler
include Log::Logging
def initialize
@read_side, @write_side = IO.pipe
@queue = Queue.new
@workers = {}
@jobs = Hash.new { |h, k| h[k] = [] }
@jobs[:timer_job] << proc { |worker| add_worker(worker) }
@jobs[:event_job] << proc { |event, blk| @jobs[event] << blk }
@running = true
end
def run
while running?
reads = IO.select([@read_side], nil, nil, find_next_trigger)
if reads&.dig(0, 0) == @read_side
@read_side.read(1)
end
@workers.each_value do |worker|
if worker.need_trigger?
@queue << [worker.job_name]
worker.set_latest_trigger_time
end
end
until @queue.empty?
job, args = @queue.pop
dispatch(job, args)
@workers[job]&.init_next_trigger_time
end
end
end
def dispatch(job, args)
@jobs[job].each do |orig_job|
begin
orig_job.call(*args)
rescue Exception => e
warn "Error in job #{job}: #{e.message}"
end
end
end
def subscribe(job_name, job_interval, &job_func)
trigger(:event_job, [job_name, job_func])
trigger(:timer_job, Timer.new(job_name, job_interval))
end
def trigger(job_type, *args)
@queue.push([job_type, *args])
notify
end
def find_next_trigger
return nil if @workers.empty?
timeout = @workers.values.map(&:next_trigger_time).min - Process.clock_gettime(Process::CLOCK_REALTIME)
timeout.positive? ? timeout : 0
end
def notify
@write_side.write_nonblock('n')
end
def add_worker(worker)
orig_job = @workers[worker.job_name]
orig_job.adjust_next_trigger_time(Process.clock_gettime(Process::CLOCK_REALTIME) - orig_job.latest_trigger) if orig_job
@workers[worker.job_name] = worker
trigger_worker(worker)
end
def trigger_worker(worker)
if worker.need_trigger?
@queue.push([worker.job_name])
worker.set_latest_trigger_time
end
end
def running?
@running
end
def shutdown
return unless running?
@running = false
@write_side.write_nonblock('s')
end
class Timer
attr_reader :job_name, :job_interval, :next_trigger_time, :latest_trigger_time
def initialize(job_name, job_interval)
@job_name = job_name
@job_interval = job_interval
@start_time = Process.clock_gettime(Process::CLOCK_REALTIME)
@latest_trigger_time = nil
init_next_trigger_time
end
def init_next_trigger_time
@next_trigger_time = gen_next_trigger_time
end
def set_latest_trigger_time
@latest_trigger_time = Process.clock_gettime(Process::CLOCK_REALTIME)
end
def to_s
"<Timer job_name: #{@job_name}, job_interval: #{@job_interval}, start_time: #{@start_time},
latest_trigger_time: #{@latest_trigger_time}>"
end
def latest_trigger
@start_time || @latest_trigger_time
end
def adjust_next_trigger_time(time)
@next_trigger_time -= time
end
def need_trigger?(now = Process.clock_gettime(Process::CLOCK_REALTIME))
now >= @next_trigger_time
end
def gen_next_trigger_time
now = Process.clock_gettime(Process::CLOCK_REALTIME)
return now if @job_interval == 0
ret = @latest_trigger_time || now
ret += @job_interval while ret <= now
ret
end
end
end
end
end