Class: Lich::Common::SharedBuffer
- Inherits:
-
Object
- Object
- Lich::Common::SharedBuffer
- Defined in:
- lib/common/sharedbuffer.rb
Overview
A thread-safe buffer that allows multiple threads to read and write data.
Instance Attribute Summary collapse
-
#max_size ⇒ Integer
The maximum size of the buffer.
Instance Method Summary collapse
-
#cleanup_threads ⇒ SharedBuffer
Cleans up the buffer by removing entries for threads that no longer exist.
-
#clear ⇒ Array<String>
Clears the lines that have been read by the current thread.
-
#gets ⇒ String?
Retrieves the next line from the buffer, blocking if necessary.
-
#gets? ⇒ String?
Retrieves the next line from the buffer without blocking.
-
#initialize(args = {}) ⇒ SharedBuffer
constructor
Initializes a new SharedBuffer instance.
-
#rewind ⇒ SharedBuffer
Resets the current thread’s read index to the beginning of the buffer.
-
#update(line) ⇒ SharedBuffer
Updates the buffer with a new line, ensuring the buffer does not exceed max_size.
Constructor Details
#initialize(args = {}) ⇒ SharedBuffer
Initializes a new SharedBuffer instance.
21 22 23 24 25 26 27 28 |
# File 'lib/common/sharedbuffer.rb', line 21 def initialize(args = {}) @buffer = Array.new @buffer_offset = 0 @buffer_index = Hash.new @buffer_mutex = Mutex.new @max_size = args[:max_size] || 500 # return self # rubocop does not like this - Lint/ReturnInVoidContext end |
Instance Attribute Details
#max_size ⇒ Integer
Returns the maximum size of the buffer.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/common/sharedbuffer.rb', line 11 class SharedBuffer attr_accessor :max_size # Initializes a new SharedBuffer instance. # # @param args [Hash] options for initialization. # @option args [Integer] :max_size (500) the maximum size of the buffer. # @return [SharedBuffer] the instance of SharedBuffer. # @example # buffer = Lich::Common::SharedBuffer.new(max_size: 1000) def initialize(args = {}) @buffer = Array.new @buffer_offset = 0 @buffer_index = Hash.new @buffer_mutex = Mutex.new @max_size = args[:max_size] || 500 # return self # rubocop does not like this - Lint/ReturnInVoidContext end # Retrieves the next line from the buffer, blocking if necessary. # # @return [String, nil] the next line from the buffer or nil if no line is available. # @note This method will block until a line is available. # @example # line = buffer.gets def gets thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length sleep 0.05 while ((@buffer_index[thread_id] - @buffer_offset) >= @buffer.length) end line = nil @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end line = @buffer[@buffer_index[thread_id] - @buffer_offset] } @buffer_index[thread_id] += 1 return line end # Retrieves the next line from the buffer without blocking. # # @return [String, nil] the next line from the buffer or nil if no line is available. # @example # line = buffer.gets? def gets? thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length return nil end line = nil @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end line = @buffer[@buffer_index[thread_id] - @buffer_offset] } @buffer_index[thread_id] += 1 return line end # Clears the lines that have been read by the current thread. # # @return [Array<String>] an array of lines that were cleared. # @example # cleared_lines = buffer.clear def clear thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } return Array.new end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length return Array.new end lines = Array.new @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end lines = @buffer[(@buffer_index[thread_id] - @buffer_offset)..-1] @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } return lines end # Resets the current thread's read index to the beginning of the buffer. # # @return [SharedBuffer] the instance of SharedBuffer. # @example # buffer.rewind # line = buffer.gets def rewind @buffer_index[Thread.current.object_id] = @buffer_offset return self end # Updates the buffer with a new line, ensuring the buffer does not exceed max_size. # # @param line [String] the line to be added to the buffer. # @return [SharedBuffer] the instance of SharedBuffer. # @example # buffer.update("New line of text") def update(line) @buffer_mutex.synchronize { fline = line.dup fline.freeze @buffer.push(fline) while (@buffer.length > @max_size) @buffer.shift @buffer_offset += 1 end } return self end # Cleans up the buffer by removing entries for threads that no longer exist. # # @return [SharedBuffer] the instance of SharedBuffer. # @example # buffer.cleanup_threads def cleanup_threads @buffer_index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } } return self end end |
Instance Method Details
#cleanup_threads ⇒ SharedBuffer
Cleans up the buffer by removing entries for threads that no longer exist.
141 142 143 144 |
# File 'lib/common/sharedbuffer.rb', line 141 def cleanup_threads @buffer_index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } } return self end |
#clear ⇒ Array<String>
Clears the lines that have been read by the current thread.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/common/sharedbuffer.rb', line 85 def clear thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } return Array.new end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length return Array.new end lines = Array.new @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end lines = @buffer[(@buffer_index[thread_id] - @buffer_offset)..-1] @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } return lines end |
#gets ⇒ String?
This method will block until a line is available.
Retrieves the next line from the buffer, blocking if necessary.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/common/sharedbuffer.rb', line 36 def gets thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length sleep 0.05 while ((@buffer_index[thread_id] - @buffer_offset) >= @buffer.length) end line = nil @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end line = @buffer[@buffer_index[thread_id] - @buffer_offset] } @buffer_index[thread_id] += 1 return line end |
#gets? ⇒ String?
Retrieves the next line from the buffer without blocking.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/common/sharedbuffer.rb', line 60 def gets? thread_id = Thread.current.object_id if @buffer_index[thread_id].nil? @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) } end if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length return nil end line = nil @buffer_mutex.synchronize { if @buffer_index[thread_id] < @buffer_offset @buffer_index[thread_id] = @buffer_offset end line = @buffer[@buffer_index[thread_id] - @buffer_offset] } @buffer_index[thread_id] += 1 return line end |
#rewind ⇒ SharedBuffer
Resets the current thread’s read index to the beginning of the buffer.
112 113 114 115 |
# File 'lib/common/sharedbuffer.rb', line 112 def rewind @buffer_index[Thread.current.object_id] = @buffer_offset return self end |
#update(line) ⇒ SharedBuffer
Updates the buffer with a new line, ensuring the buffer does not exceed max_size.
123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/common/sharedbuffer.rb', line 123 def update(line) @buffer_mutex.synchronize { fline = line.dup fline.freeze @buffer.push(fline) while (@buffer.length > @max_size) @buffer.shift @buffer_offset += 1 end } return self end |