Class: Lich::Common::SharedBuffer
- Inherits:
-
Object
- Object
- Lich::Common::SharedBuffer
- Defined in:
- documented/common/sharedbuffer.rb
Overview
A thread-safe buffer that allows multiple threads to read and write data. This class manages a shared buffer with a maximum size and provides methods to read from and write to it.
Instance Attribute Summary collapse
-
#max_size ⇒ Object
Returns the value of attribute max_size.
Instance Method Summary collapse
-
#cleanup_threads ⇒ SharedBuffer
Cleans up the buffer index for threads that are no longer active.
-
#clear ⇒ Array<String>
Clears the lines that have been read from the buffer.
-
#gets ⇒ String?
Retrieves the next line from the buffer, blocking if necessary.
-
#gets? ⇒ String?
Retrieves the next line from the buffer without blocking.
-
#initialize(args = {}) ⇒ SharedBuffer
constructor
Initializes a new SharedBuffer instance.
-
#rewind ⇒ SharedBuffer
rubocop:disable Lint/HashCompareByIdentity Resets the buffer index for the current thread to the beginning of the buffer.
-
#update(line) ⇒ SharedBuffer
rubocop:enable Lint/HashCompareByIdentity Adds a new line to the buffer, managing the size of the buffer.
Constructor Details
#initialize(args = {}) ⇒ SharedBuffer
Initializes a new SharedBuffer instance.
18 19 20 21 22 23 24 25 |
# File 'documented/common/sharedbuffer.rb', line 18 def initialize(args = {}) @buffer = Array.new @buffer_offset = 0 @buffer_index = Hash.new @buffer_mutex = Mutex.new @max_size = args[:max_size] || 500 # return self # rubocop does not like this - Lint/ReturnInVoidContext end |
Instance Attribute Details
#max_size ⇒ Object
Returns the value of attribute max_size.
12 13 14 |
# File 'documented/common/sharedbuffer.rb', line 12 def max_size @max_size end |
Instance Method Details
#cleanup_threads ⇒ SharedBuffer
Cleans up the buffer index for threads that are no longer active.
121 122 123 124 |
# File 'documented/common/sharedbuffer.rb', line 121 def cleanup_threads @buffer_index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } } return self end |
#clear ⇒ Array<String>
Clears the lines that have been read from the buffer.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'documented/common/sharedbuffer.rb', line 73 def clear thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } return Array.new end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length return Array.new end lines = Array.new @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end lines = @buffer[(@buffer_index[thread_id] - @buffer_offset)..-1] @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } return lines end |
#gets ⇒ String?
This method blocks until a line is available.
Retrieves the next line from the buffer, blocking if necessary.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'documented/common/sharedbuffer.rb', line 30 def gets thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length sleep 0.05 while ((@buffer_index[thread_id] - @buffer_offset) >= @buffer.length) end line = nil @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end line = @buffer[@buffer_index[thread_id] - @buffer_offset] } @buffer_index[thread_id] += 1 return line end |
#gets? ⇒ String?
Retrieves the next line from the buffer without blocking.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'documented/common/sharedbuffer.rb', line 51 def gets? thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length return nil end line = nil @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end line = @buffer[@buffer_index[thread_id] - @buffer_offset] } @buffer_index[thread_id] += 1 return line end |
#rewind ⇒ SharedBuffer
rubocop:disable Lint/HashCompareByIdentity Resets the buffer index for the current thread to the beginning of the buffer.
97 98 99 100 |
# File 'documented/common/sharedbuffer.rb', line 97 def rewind @buffer_index[Thread.current.object_id] = @buffer_offset return self end |
#update(line) ⇒ SharedBuffer
rubocop:enable Lint/HashCompareByIdentity Adds a new line to the buffer, managing the size of the buffer.
106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'documented/common/sharedbuffer.rb', line 106 def update(line) @buffer_mutex.synchronize { fline = line.dup fline.freeze @buffer.push(fline) while (@buffer.length > @max_size) @buffer.shift @buffer_offset += 1 end } return self end |