Class: Julewire::Core::Destinations::SynchronizedOutput

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/core/destinations/synchronized_output.rb

Instance Method Summary collapse

Constructor Details

#initialize(output, close_output: false) ⇒ SynchronizedOutput

Returns a new instance of SynchronizedOutput.



10
11
12
13
14
15
16
17
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 10

def initialize(output, close_output: false)
  Sink.validate_writeable!(output)
  @output = output
  @close_output = close_output
  @mutex = Mutex.new
  @lifecycle_mutex = Mutex.new
  @lifecycle = lifecycle_methods
end

Instance Method Details

#after_fork!Object



19
20
21
22
23
24
25
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 19

def after_fork!
  @mutex = Mutex.new
  @lifecycle_mutex = Mutex.new
  @output.after_fork! if @output.respond_to?(:after_fork!)
  @lifecycle = lifecycle_methods
  self
end

#close(timeout: nil) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 48

def close(timeout: nil)
  @lifecycle_mutex.synchronize do
    # Close is terminal: lifecycle calls stay serialized, and the write mutex
    # keeps the underlying output from being closed while a write is in flight.
    @mutex.synchronize do
      return true if output_closed?

      result = if @close_output && @lifecycle[:close]
                 call_lifecycle(:close, @lifecycle.fetch(:close), timeout: timeout)
               elsif @lifecycle[:flush]
                 call_lifecycle(:flush, @lifecycle.fetch(:flush), timeout: timeout)
               end
      result != false
    end
  end
end

#flush(timeout: nil) ⇒ Object



39
40
41
42
43
44
45
46
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 39

def flush(timeout: nil)
  @lifecycle_mutex.synchronize do
    lifecycle = @lifecycle[:flush]
    return true unless lifecycle

    call_lifecycle(:flush, lifecycle, timeout: timeout) != false
  end
end

#output_class_nameObject



27
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 27

def output_class_name = @output.class.name

#resource_identityObject



29
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 29

def resource_identity = @output

#write(value) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/julewire/core/destinations/synchronized_output.rb', line 31

def write(value)
  @mutex.synchronize do
    return false if output_closed?

    @output.write(value)
  end
end