class Qpid::Proton::Container::SelectWaker

Selectable object that can be used to wake IO.select from another thread

Public Class Methods

new() click to toggle source
# File lib/core/container.rb, line 329
def initialize
  @rd, @wr = IO.pipe
  @lock = Mutex.new
  @set = false
end

Public Instance Methods

add(task) click to toggle source

All new tasks are added here

# File lib/core/container.rb, line 489
def add task
  @lock.synchronize do
    @active += 1
    task.close @stop_err if @stopped
  end
  work_wake task
end
check_stop_lh() click to toggle source
# File lib/core/container.rb, line 512
def check_stop_lh
  if @active.zero? && (@auto_stop || @stopped)
    @stopped = true
    work_wake nil          # Signal threads to stop
    true
  end
end
close() click to toggle source
# File lib/core/container.rb, line 353
def close
  @rd.close
  @wr.close
end
connection_driver(io, opts=nil, server=false) click to toggle source
# File lib/core/container.rb, line 481
def connection_driver(io, opts=nil, server=false)
  opts ||= {}
  opts[:container] = self
  opts[:handler] ||= @adapter
  ConnectionTask.new(self, io, opts, server)
end
do_select() click to toggle source
# File lib/core/container.rb, line 422
def do_select
  # Compute the sets to select for read and write, and the minimum next_tick for the timeout
  r, w = [@wake], []
  next_tick = nil
  @lock.synchronize do
    @selectable.each do |s|
      r << s if s.can_read?
      w << s if s.can_write?
      next_tick = earliest(s.next_tick, next_tick)
    end
  end
  next_tick = earliest(@schedule.next_tick, next_tick)

  # Do the select and queue up all resulting work
  now = Time.now
  timeout = next_tick - now if next_tick
  r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
  @wake.reset
  selected = Set.new
  @lock.synchronize do
    if @stopped
      @selectable.each { |s| s.close @stop_err; @work << s }
      @wake.close
      return
    end
    # Check if schedule has items due and is not already working
    if !@schedule_working && before_eq(@schedule.next_tick, now)
      @work << :schedule
      @schedule_working = true
    end
    # Eliminate duplicates between r, w and next_tick due.
    selected.merge(r) if r
    selected.delete(@wake)
    selected.merge(w) if w
    @selectable -= selected
    selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
    @selectable -= selected
  end
  selected.each { |s| @work << s } # Queue up tasks needing #process
  @work << :select
end
maybe_panic() { || ... } click to toggle source

Rescue any exception raised by the block and stop the container.

# File lib/core/container.rb, line 465
def maybe_panic
  begin
    yield
  rescue Exception => e
    stop(nil, e)
    nil
  end
end
not_stopped() click to toggle source
# File lib/core/container.rb, line 520
def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
rearm(task) click to toggle source
# File lib/core/container.rb, line 497
def rearm task
  @lock.synchronize do
    if task.finished?
      @active -= 1
      check_stop_lh
    elsif @stopped
      task.close @stop_err
      work_wake task
    else
      @selectable << task
    end
  end
  @wake.wake
end
reset() click to toggle source
# File lib/core/container.rb, line 345
  def reset
    @lock.synchronize do
      return unless @set
      begin @rd.read_nonblock(1) rescue IO::WaitReadable end
      @set = false
    end
  end

  def close
    @rd.close
    @wr.close
  end
end
run_one(task, now) click to toggle source

Handle a single item from the @work queue, this is the heart of the run loop.

# File lib/core/container.rb, line 360
def run_one(task, now)
  case task

  when :start
    @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start

  when :select
    # Compute read/write select sets and minimum next_tick for select timeout
    r, w = [@wake], []
    next_tick = @schedule.next_tick
    @lock.synchronize do
      @selectable.each do |s|
        r << s if s.send :can_read?
        w << s if s.send :can_write?
        next_tick = earliest(s.next_tick, next_tick)
      end
    end

    timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
    r, w = IO.select(r, w, nil, timeout)
    now = Time.now unless timeout == 0
    @wake.reset if r && r.delete(@wake)

    # selected is a Set to eliminate duplicates between r, w and next_tick due.
    selected = Set.new
    selected.merge(r) if r
    selected.merge(w) if w
    @lock.synchronize do
      if @stopped # close everything
        @selectable.each { |s| s.close @stop_err; @work << s }
        @selectable.clear
        @wake.close
        return
      end
      if !@schedule_working && before_eq(@schedule.next_tick, now)
        @schedule_working = true
        @work << :schedule
      end
      selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
      @selectable -= selected # Remove selected tasks from @selectable
    end
    selected.each { |s| @work << s } # Queue up tasks needing #process
    @work << :select        # Enable next select

  when ConnectionTask then
    maybe_panic { task.process now }
    rearm task

  when ListenTask then
    io, opts = maybe_panic { task.process }
    add(connection_driver(io, opts, true)) if io
    rearm task

  when :schedule then
    if maybe_panic { @schedule.process now }
      @lock.synchronize { @active -= 1; check_stop_lh }
    else
      @lock.synchronize { @schedule_working = false }
    end
  end
end
to_io() click to toggle source
# File lib/core/container.rb, line 335
def to_io() @rd; end
wake() click to toggle source
# File lib/core/container.rb, line 337
    def wake
      @lock.synchronize do
        return if @set        # Don't write if already has data
        @set = true
        begin @wr.write_nonblock('x') rescue IO::WaitWritable end
      end
    end

    def reset
      @lock.synchronize do
        return unless @set
        begin @rd.read_nonblock(1) rescue IO::WaitReadable end
        @set = false
      end
    end

    def close
      @rd.close
      @wr.close
    end
  end

  # Handle a single item from the @work queue, this is the heart of the #run loop.
  def run_one(task, now)
    case task

    when :start
      @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start

    when :select
      # Compute read/write select sets and minimum next_tick for select timeout
      r, w = [@wake], []
      next_tick = @schedule.next_tick
      @lock.synchronize do
        @selectable.each do |s|
          r << s if s.send :can_read?
          w << s if s.send :can_write?
          next_tick = earliest(s.next_tick, next_tick)
        end
      end

      timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
      r, w = IO.select(r, w, nil, timeout)
      now = Time.now unless timeout == 0
      @wake.reset if r && r.delete(@wake)

      # selected is a Set to eliminate duplicates between r, w and next_tick due.
      selected = Set.new
      selected.merge(r) if r
      selected.merge(w) if w
      @lock.synchronize do
        if @stopped # close everything
          @selectable.each { |s| s.close @stop_err; @work << s }
          @selectable.clear
          @wake.close
          return
        end
        if !@schedule_working && before_eq(@schedule.next_tick, now)
          @schedule_working = true
          @work << :schedule
        end
        selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
        @selectable -= selected # Remove selected tasks from @selectable
      end
      selected.each { |s| @work << s } # Queue up tasks needing #process
      @work << :select        # Enable next select

    when ConnectionTask then
      maybe_panic { task.process now }
      rearm task

    when ListenTask then
      io, opts = maybe_panic { task.process }
      add(connection_driver(io, opts, true)) if io
      rearm task

    when :schedule then
      if maybe_panic { @schedule.process now }
        @lock.synchronize { @active -= 1; check_stop_lh }
      else
        @lock.synchronize { @schedule_working = false }
      end
    end
  end

  def do_select
    # Compute the sets to select for read and write, and the minimum next_tick for the timeout
    r, w = [@wake], []
    next_tick = nil
    @lock.synchronize do
      @selectable.each do |s|
        r << s if s.can_read?
        w << s if s.can_write?
        next_tick = earliest(s.next_tick, next_tick)
      end
    end
    next_tick = earliest(@schedule.next_tick, next_tick)

    # Do the select and queue up all resulting work
    now = Time.now
    timeout = next_tick - now if next_tick
    r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
    @wake.reset
    selected = Set.new
    @lock.synchronize do
      if @stopped
        @selectable.each { |s| s.close @stop_err; @work << s }
        @wake.close
        return
      end
      # Check if schedule has items due and is not already working
      if !@schedule_working && before_eq(@schedule.next_tick, now)
        @work << :schedule
        @schedule_working = true
      end
      # Eliminate duplicates between r, w and next_tick due.
      selected.merge(r) if r
      selected.delete(@wake)
      selected.merge(w) if w
      @selectable -= selected
      selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
      @selectable -= selected
    end
    selected.each { |s| @work << s } # Queue up tasks needing #process
    @work << :select
  end

  # Rescue any exception raised by the block and stop the container.
  def maybe_panic
    begin
      yield
    rescue Exception => e
      stop(nil, e)
      nil
    end
  end

  # Normally if we add work we need to set a wakeup to ensure a single #run
  # thread doesn't get stuck in select while there is other work on the queue.
  def work_wake(task)
    @work << task
    @wake.wake
  end

  def connection_driver(io, opts=nil, server=false)
    opts ||= {}
    opts[:container] = self
    opts[:handler] ||= @adapter
    ConnectionTask.new(self, io, opts, server)
  end

  # All new tasks are added here
  def add task
    @lock.synchronize do
      @active += 1
      task.close @stop_err if @stopped
    end
    work_wake task
  end

  def rearm task
    @lock.synchronize do
      if task.finished?
        @active -= 1
        check_stop_lh
      elsif @stopped
        task.close @stop_err
        work_wake task
      else
        @selectable << task
      end
    end
    @wake.wake
  end

  def check_stop_lh
    if @active.zero? && (@auto_stop || @stopped)
      @stopped = true
      work_wake nil          # Signal threads to stop
      true
    end
  end

  def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end

end
work_wake(task) click to toggle source

Normally if we add work we need to set a wakeup to ensure a single run thread doesn't get stuck in select while there is other work on the queue.

# File lib/core/container.rb, line 476
def work_wake(task)
  @work << task
  @wake.wake
end