Selectable object that can be used to wake IO.select from another thread
# File lib/core/container.rb, line 329 def initialize @rd, @wr = IO.pipe @lock = Mutex.new @set = false end
All new tasks are added here
# File lib/core/container.rb, line 489 def add task @lock.synchronize do @active += 1 task.close @stop_err if @stopped end work_wake task end
# File lib/core/container.rb, line 512 def check_stop_lh if @active.zero? && (@auto_stop || @stopped) @stopped = true work_wake nil # Signal threads to stop true end end
# File lib/core/container.rb, line 353 def close @rd.close @wr.close end
# File lib/core/container.rb, line 481 def connection_driver(io, opts=nil, server=false) opts ||= {} opts[:container] = self opts[:handler] ||= @adapter ConnectionTask.new(self, io, opts, server) end
# File lib/core/container.rb, line 422 def do_select # Compute the sets to select for read and write, and the minimum next_tick for the timeout r, w = [@wake], [] next_tick = nil @lock.synchronize do @selectable.each do |s| r << s if s.can_read? w << s if s.can_write? next_tick = earliest(s.next_tick, next_tick) end end next_tick = earliest(@schedule.next_tick, next_tick) # Do the select and queue up all resulting work now = Time.now timeout = next_tick - now if next_tick r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout) @wake.reset selected = Set.new @lock.synchronize do if @stopped @selectable.each { |s| s.close @stop_err; @work << s } @wake.close return end # Check if schedule has items due and is not already working if !@schedule_working && before_eq(@schedule.next_tick, now) @work << :schedule @schedule_working = true end # Eliminate duplicates between r, w and next_tick due. selected.merge(r) if r selected.delete(@wake) selected.merge(w) if w @selectable -= selected selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) @selectable -= selected end selected.each { |s| @work << s } # Queue up tasks needing #process @work << :select end
Rescue any exception raised by the block and stop the container.
# File lib/core/container.rb, line 465 def maybe_panic begin yield rescue Exception => e stop(nil, e) nil end end
# File lib/core/container.rb, line 520 def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
# File lib/core/container.rb, line 497 def rearm task @lock.synchronize do if task.finished? @active -= 1 check_stop_lh elsif @stopped task.close @stop_err work_wake task else @selectable << task end end @wake.wake end
# File lib/core/container.rb, line 345 def reset @lock.synchronize do return unless @set begin @rd.read_nonblock(1) rescue IO::WaitReadable end @set = false end end def close @rd.close @wr.close end end
Handle a single item from the @work queue, this is the heart of the run loop.
# File lib/core/container.rb, line 360 def run_one(task, now) case task when :start @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start when :select # Compute read/write select sets and minimum next_tick for select timeout r, w = [@wake], [] next_tick = @schedule.next_tick @lock.synchronize do @selectable.each do |s| r << s if s.send :can_read? w << s if s.send :can_write? next_tick = earliest(s.next_tick, next_tick) end end timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick r, w = IO.select(r, w, nil, timeout) now = Time.now unless timeout == 0 @wake.reset if r && r.delete(@wake) # selected is a Set to eliminate duplicates between r, w and next_tick due. selected = Set.new selected.merge(r) if r selected.merge(w) if w @lock.synchronize do if @stopped # close everything @selectable.each { |s| s.close @stop_err; @work << s } @selectable.clear @wake.close return end if !@schedule_working && before_eq(@schedule.next_tick, now) @schedule_working = true @work << :schedule end selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) @selectable -= selected # Remove selected tasks from @selectable end selected.each { |s| @work << s } # Queue up tasks needing #process @work << :select # Enable next select when ConnectionTask then maybe_panic { task.process now } rearm task when ListenTask then io, opts = maybe_panic { task.process } add(connection_driver(io, opts, true)) if io rearm task when :schedule then if maybe_panic { @schedule.process now } @lock.synchronize { @active -= 1; check_stop_lh } else @lock.synchronize { @schedule_working = false } end end end
# File lib/core/container.rb, line 335 def to_io() @rd; end
# File lib/core/container.rb, line 337 def wake @lock.synchronize do return if @set # Don't write if already has data @set = true begin @wr.write_nonblock('x') rescue IO::WaitWritable end end end def reset @lock.synchronize do return unless @set begin @rd.read_nonblock(1) rescue IO::WaitReadable end @set = false end end def close @rd.close @wr.close end end # Handle a single item from the @work queue, this is the heart of the #run loop. def run_one(task, now) case task when :start @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start when :select # Compute read/write select sets and minimum next_tick for select timeout r, w = [@wake], [] next_tick = @schedule.next_tick @lock.synchronize do @selectable.each do |s| r << s if s.send :can_read? w << s if s.send :can_write? next_tick = earliest(s.next_tick, next_tick) end end timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick r, w = IO.select(r, w, nil, timeout) now = Time.now unless timeout == 0 @wake.reset if r && r.delete(@wake) # selected is a Set to eliminate duplicates between r, w and next_tick due. selected = Set.new selected.merge(r) if r selected.merge(w) if w @lock.synchronize do if @stopped # close everything @selectable.each { |s| s.close @stop_err; @work << s } @selectable.clear @wake.close return end if !@schedule_working && before_eq(@schedule.next_tick, now) @schedule_working = true @work << :schedule end selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) @selectable -= selected # Remove selected tasks from @selectable end selected.each { |s| @work << s } # Queue up tasks needing #process @work << :select # Enable next select when ConnectionTask then maybe_panic { task.process now } rearm task when ListenTask then io, opts = maybe_panic { task.process } add(connection_driver(io, opts, true)) if io rearm task when :schedule then if maybe_panic { @schedule.process now } @lock.synchronize { @active -= 1; check_stop_lh } else @lock.synchronize { @schedule_working = false } end end end def do_select # Compute the sets to select for read and write, and the minimum next_tick for the timeout r, w = [@wake], [] next_tick = nil @lock.synchronize do @selectable.each do |s| r << s if s.can_read? w << s if s.can_write? next_tick = earliest(s.next_tick, next_tick) end end next_tick = earliest(@schedule.next_tick, next_tick) # Do the select and queue up all resulting work now = Time.now timeout = next_tick - now if next_tick r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout) @wake.reset selected = Set.new @lock.synchronize do if @stopped @selectable.each { |s| s.close @stop_err; @work << s } @wake.close return end # Check if schedule has items due and is not already working if !@schedule_working && before_eq(@schedule.next_tick, now) @work << :schedule @schedule_working = true end # Eliminate duplicates between r, w and next_tick due. selected.merge(r) if r selected.delete(@wake) selected.merge(w) if w @selectable -= selected selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) @selectable -= selected end selected.each { |s| @work << s } # Queue up tasks needing #process @work << :select end # Rescue any exception raised by the block and stop the container. def maybe_panic begin yield rescue Exception => e stop(nil, e) nil end end # Normally if we add work we need to set a wakeup to ensure a single #run # thread doesn't get stuck in select while there is other work on the queue. def work_wake(task) @work << task @wake.wake end def connection_driver(io, opts=nil, server=false) opts ||= {} opts[:container] = self opts[:handler] ||= @adapter ConnectionTask.new(self, io, opts, server) end # All new tasks are added here def add task @lock.synchronize do @active += 1 task.close @stop_err if @stopped end work_wake task end def rearm task @lock.synchronize do if task.finished? @active -= 1 check_stop_lh elsif @stopped task.close @stop_err work_wake task else @selectable << task end end @wake.wake end def check_stop_lh if @active.zero? && (@auto_stop || @stopped) @stopped = true work_wake nil # Signal threads to stop true end end def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end end
Normally if we add work we need to set a wakeup to ensure a single run thread doesn't get stuck in select while there is other work on the queue.
# File lib/core/container.rb, line 476 def work_wake(task) @work << task @wake.wake end