class ProcessExecuter::MonitoredPipe

Stream data sent through a pipe to one or more writers

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.

Data that is read from that pipe is written one or more writers passed to ‘#initialize`.

If any of the writers raise an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in ‘#exception`.

‘#close` must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the writers, and (3) the monitoring thread is killed.

@example Collect pipe data into a string

pipe_data = StringIO.new
begin
  pipe = MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

@example Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
File.read("pipe_data.txt") #=> "Hello World"

@api public

Attributes

chunk_size[R]

@!attribute [r]

The size of the chunks to read from the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 1000

@return [Integer] the size of the chunks to read from the pipe

exception[R]

@!attribute [r]

The exception raised by a writer

If an exception is raised by a writer, it is stored here. Otherwise, it is ‘nil`.

@example

pipe.exception #=> nil

@return [Exception, nil] the exception raised by a writer or ‘nil` if no exception was raised

pipe_reader[R]

@!attribute [r]

The read end of the pipe

@example

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

@return [IO]

pipe_writer[R]

@!attribute [r]

The write end of the pipe

@example

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

@return [IO] the write end of the pipe

state[R]

@!attribute [r]

The state of the pipe

Must be either ‘:open`, `:closing`, or `:closed`

  • ‘:open` - the pipe is open and data can be written to it

  • ‘:closing` - the pipe is being closed and data can no longer be written to it

  • ‘:closed` - the pipe is closed and data can no longer be written to it

@example

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

@return [Symbol] the state of the pipe

thread[R]

@!attribute [r]

The thread that monitors the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

@return [Thread]

writers[R]

@!attribute [r]

An array of writers to write data that is read from the pipe

@example with one writer

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.writers #=> [data_collector]

@example with an array of writers

require 'stringio'
data_collector1 = StringIO.new
data_collector2 = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector1, data_collector2)
pipe.writers #=> [data_collector1, data_collector2]]

@return [Array<#write>]

Public Class Methods

new(*writers, chunk_size: 100_000) click to toggle source

Create a new monitored pipe

Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.

@example

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)

@param writers [Array<#write>] as data is read from the pipe, it is written to these writers @param chunk_size [Integer] the size of the chunks to read from the pipe

# File lib/process_executer/monitored_pipe.rb, line 58
def initialize(*writers, chunk_size: 100_000)
  @writers = writers
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  @state = :open
  @thread = Thread.new do
    Thread.current.report_on_exception = false
    Thread.current.abort_on_exception = false
    monitor
  end
end

Public Instance Methods

close() click to toggle source

Set the state to ‘:closing` and wait for the state to be set to `:closed`

The monitoring thread will see that the state has changed and will close the pipe.

@example

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"

@return [void]

# File lib/process_executer/monitored_pipe.rb, line 85
def close
  return unless state == :open

  @state = :closing
  sleep 0.001 until state == :closed
end
fileno() click to toggle source

@!attribute [r] fileno

The file descriptor for the write end of the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

@return [Integer] the file descriptor for the write end of the pipe

@api private

# File lib/process_executer/monitored_pipe.rb, line 129
def fileno
  pipe_writer.fileno
end
to_io() click to toggle source

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the writers passed to ‘#initialize`.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

@return [IO] the write end of the pipe

@api private

# File lib/process_executer/monitored_pipe.rb, line 111
def to_io
  pipe_writer
end
write(data) click to toggle source

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

@param data [String] the data to write to the pipe

@return [Integer] the number of bytes written to the pipe

@api private

# File lib/process_executer/monitored_pipe.rb, line 151
def write(data)
  raise IOError, 'closed stream' unless state == :open

  pipe_writer.write(data)
end

Private Instance Methods

close_pipe() click to toggle source

Read any remaining data from the pipe and close it

@return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 301
def close_pipe
  # Close the write end of the pipe so no more data can be written to it
  pipe_writer.close

  # Read remaining data from pipe_reader (if any)
  # If an exception was already raised by the last call to #write, then don't try to read remaining data
  monitor_pipe while exception.nil? && !pipe_reader.eof?

  # Close the read end of the pipe
  pipe_reader.close
end
monitor() click to toggle source

Read data from the pipe until ‘#state` is changed to `:closing`

The state is changed to ‘:closed` by calling `#close`.

Before this method returns, state is set to ‘:closed`

@return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 270
def monitor
  monitor_pipe until state == :closing
  close_pipe
  @state = :closed
end
monitor_pipe() click to toggle source

Read data from the pipe until ‘#state` is changed to `:closing`

Data read from the pipe is written to the writers given to the constructor.

@return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 282
def monitor_pipe
  new_data = pipe_reader.read_nonblock(chunk_size)
  # SimpleCov under JRuby reports the begin statement as not covered, but it is
  # :nocov:
  begin
    # :nocov:
    writers.each { |w| w.write(new_data) }
  rescue StandardError => e
    @exception = e
    @state = :closing
  end
rescue IO::WaitReadable
  pipe_reader.wait_readable(0.001)
end