Class: Lich::Common::SharedBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/common/sharedbuffer.rb

Overview

A thread-safe buffer that allows multiple threads to read and write data.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ SharedBuffer

Initializes a new SharedBuffer instance.

Examples:

buffer = Lich::Common::SharedBuffer.new(max_size: 1000)

Parameters:

  • args (Hash) (defaults to: {})

    options for initialization.

Options Hash (args):

  • :max_size (Integer) — default: 500

    the maximum size of the buffer.



21
22
23
24
25
26
27
28
# File 'lib/common/sharedbuffer.rb', line 21

def initialize(args = {})
  @buffer = Array.new
  @buffer_offset = 0
  @buffer_index = Hash.new
  @buffer_mutex = Mutex.new
  @max_size = args[:max_size] || 500
  # return self # rubocop does not like this - Lint/ReturnInVoidContext
end

Instance Attribute Details

#max_sizeInteger

Returns the maximum size of the buffer.

Returns:

  • (Integer)

    the maximum size of the buffer.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/common/sharedbuffer.rb', line 11

class SharedBuffer
  attr_accessor :max_size

  # Initializes a new SharedBuffer instance.
  #
  # @param args [Hash] options for initialization.
  # @option args [Integer] :max_size (500) the maximum size of the buffer.
  # @return [SharedBuffer] the instance of SharedBuffer.
  # @example
  #   buffer = Lich::Common::SharedBuffer.new(max_size: 1000)
  def initialize(args = {})
    @buffer = Array.new
    @buffer_offset = 0
    @buffer_index = Hash.new
    @buffer_mutex = Mutex.new
    @max_size = args[:max_size] || 500
    # return self # rubocop does not like this - Lint/ReturnInVoidContext
  end

  # Retrieves the next line from the buffer, blocking if necessary.
  #
  # @return [String, nil] the next line from the buffer or nil if no line is available.
  # @note This method will block until a line is available.
  # @example
  #   line = buffer.gets
  def gets
    thread_id = Thread.current.object_id
    if @buffer_index[thread_id].nil?
      @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
    end
    if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
      sleep 0.05 while ((@buffer_index[thread_id] - @buffer_offset) >= @buffer.length)
    end
    line = nil
    @buffer_mutex.synchronize {
      if @buffer_index[thread_id] < @buffer_offset
        @buffer_index[thread_id] = @buffer_offset
      end
      line = @buffer[@buffer_index[thread_id] - @buffer_offset]
    }
    @buffer_index[thread_id] += 1
    return line
  end

  # Retrieves the next line from the buffer without blocking.
  #
  # @return [String, nil] the next line from the buffer or nil if no line is available.
  # @example
  #   line = buffer.gets?
  def gets?
    thread_id = Thread.current.object_id
    if @buffer_index[thread_id].nil?
      @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
    end
    if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
      return nil
    end

    line = nil
    @buffer_mutex.synchronize {
      if @buffer_index[thread_id] < @buffer_offset
        @buffer_index[thread_id] = @buffer_offset
      end
      line = @buffer[@buffer_index[thread_id] - @buffer_offset]
    }
    @buffer_index[thread_id] += 1
    return line
  end

  # Clears the lines that have been read by the current thread.
  #
  # @return [Array<String>] an array of lines that were cleared.
  # @example
  #   cleared_lines = buffer.clear
  def clear
    thread_id = Thread.current.object_id
    if @buffer_index[thread_id].nil?
      @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
      return Array.new
    end
    if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
      return Array.new
    end

    lines = Array.new
    @buffer_mutex.synchronize {
      if @buffer_index[thread_id] < @buffer_offset
        @buffer_index[thread_id] = @buffer_offset
      end
      lines = @buffer[(@buffer_index[thread_id] - @buffer_offset)..-1]
      @buffer_index[thread_id] = (@buffer_offset + @buffer.length)
    }
    return lines
  end

  # Resets the current thread's read index to the beginning of the buffer.
  #
  # @return [SharedBuffer] the instance of SharedBuffer.
  # @example
  #   buffer.rewind
  #   line = buffer.gets
  def rewind
    @buffer_index[Thread.current.object_id] = @buffer_offset
    return self
  end

  # Updates the buffer with a new line, ensuring the buffer does not exceed max_size.
  #
  # @param line [String] the line to be added to the buffer.
  # @return [SharedBuffer] the instance of SharedBuffer.
  # @example
  #   buffer.update("New line of text")
  def update(line)
    @buffer_mutex.synchronize {
      fline = line.dup
      fline.freeze
      @buffer.push(fline)
      while (@buffer.length > @max_size)
        @buffer.shift
        @buffer_offset += 1
      end
    }
    return self
  end

  # Cleans up the buffer by removing entries for threads that no longer exist.
  #
  # @return [SharedBuffer] the instance of SharedBuffer.
  # @example
  #   buffer.cleanup_threads
  def cleanup_threads
    @buffer_index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } }
    return self
  end
end

Instance Method Details

#cleanup_threadsSharedBuffer

Cleans up the buffer by removing entries for threads that no longer exist.

Examples:

buffer.cleanup_threads

Returns:



141
142
143
144
# File 'lib/common/sharedbuffer.rb', line 141

def cleanup_threads
  @buffer_index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } }
  return self
end

#clearArray<String>

Clears the lines that have been read by the current thread.

Examples:

cleared_lines = buffer.clear

Returns:

  • (Array<String>)

    an array of lines that were cleared.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/common/sharedbuffer.rb', line 85

def clear
  thread_id = Thread.current.object_id
  if @buffer_index[thread_id].nil?
    @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
    return Array.new
  end
  if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
    return Array.new
  end

  lines = Array.new
  @buffer_mutex.synchronize {
    if @buffer_index[thread_id] < @buffer_offset
      @buffer_index[thread_id] = @buffer_offset
    end
    lines = @buffer[(@buffer_index[thread_id] - @buffer_offset)..-1]
    @buffer_index[thread_id] = (@buffer_offset + @buffer.length)
  }
  return lines
end

#getsString?

Note:

This method will block until a line is available.

Retrieves the next line from the buffer, blocking if necessary.

Examples:

line = buffer.gets

Returns:

  • (String, nil)

    the next line from the buffer or nil if no line is available.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/common/sharedbuffer.rb', line 36

def gets
  thread_id = Thread.current.object_id
  if @buffer_index[thread_id].nil?
    @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
  end
  if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
    sleep 0.05 while ((@buffer_index[thread_id] - @buffer_offset) >= @buffer.length)
  end
  line = nil
  @buffer_mutex.synchronize {
    if @buffer_index[thread_id] < @buffer_offset
      @buffer_index[thread_id] = @buffer_offset
    end
    line = @buffer[@buffer_index[thread_id] - @buffer_offset]
  }
  @buffer_index[thread_id] += 1
  return line
end

#gets?String?

Retrieves the next line from the buffer without blocking.

Examples:

line = buffer.gets?

Returns:

  • (String, nil)

    the next line from the buffer or nil if no line is available.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/common/sharedbuffer.rb', line 60

def gets?
  thread_id = Thread.current.object_id
  if @buffer_index[thread_id].nil?
    @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
  end
  if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
    return nil
  end

  line = nil
  @buffer_mutex.synchronize {
    if @buffer_index[thread_id] < @buffer_offset
      @buffer_index[thread_id] = @buffer_offset
    end
    line = @buffer[@buffer_index[thread_id] - @buffer_offset]
  }
  @buffer_index[thread_id] += 1
  return line
end

#rewindSharedBuffer

Resets the current thread’s read index to the beginning of the buffer.

Examples:

buffer.rewind
line = buffer.gets

Returns:



112
113
114
115
# File 'lib/common/sharedbuffer.rb', line 112

def rewind
  @buffer_index[Thread.current.object_id] = @buffer_offset
  return self
end

#update(line) ⇒ SharedBuffer

Updates the buffer with a new line, ensuring the buffer does not exceed max_size.

Examples:

buffer.update("New line of text")

Parameters:

  • line (String)

    the line to be added to the buffer.

Returns:



123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/common/sharedbuffer.rb', line 123

def update(line)
  @buffer_mutex.synchronize {
    fline = line.dup
    fline.freeze
    @buffer.push(fline)
    while (@buffer.length > @max_size)
      @buffer.shift
      @buffer_offset += 1
    end
  }
  return self
end