Class: Lich::Common::SharedBuffer

Inherits:
Object
  • Object
show all
Defined in:
documented/common/sharedbuffer.rb

Overview

A thread-safe buffer that allows multiple threads to read and write data. This class manages a shared buffer with a maximum size and provides methods to read from and write to it.

Examples:

Creating a shared buffer

buffer = Lich::Common::SharedBuffer.new(max_size: 1000)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ SharedBuffer

Initializes a new SharedBuffer instance.

Parameters:

  • args (Hash) (defaults to: {})

    Options for initializing the buffer.

Options Hash (args):

  • :max_size (Integer)

    The maximum size of the buffer (default is 500).



18
19
20
21
22
23
24
25
# File 'documented/common/sharedbuffer.rb', line 18

def initialize(args = {})
  @buffer = Array.new
  @buffer_offset = 0
  @buffer_index = Hash.new
  @buffer_mutex = Mutex.new
  @max_size = args[:max_size] || 500
  # return self # rubocop does not like this - Lint/ReturnInVoidContext
end

Instance Attribute Details

#max_sizeObject

Returns the value of attribute max_size.



12
13
14
# File 'documented/common/sharedbuffer.rb', line 12

def max_size
  @max_size
end

Instance Method Details

#cleanup_threadsSharedBuffer

Cleans up the buffer index for threads that are no longer active.

Returns:



121
122
123
124
# File 'documented/common/sharedbuffer.rb', line 121

def cleanup_threads
  @buffer_index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } }
  return self
end

#clearArray<String>

Clears the lines that have been read from the buffer.

Returns:

  • (Array<String>)

    An array of lines that were cleared from the buffer.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'documented/common/sharedbuffer.rb', line 73

def clear
  thread_id = Thread.current.object_id
  if @buffer_index[thread_id].nil?
    @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
    return Array.new
  end
  if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
    return Array.new
  end

  lines = Array.new
  @buffer_mutex.synchronize {
    if @buffer_index[thread_id] < @buffer_offset
      @buffer_index[thread_id] = @buffer_offset
    end
    lines = @buffer[(@buffer_index[thread_id] - @buffer_offset)..-1]
    @buffer_index[thread_id] = (@buffer_offset + @buffer.length)
  }
  return lines
end

#getsString?

Note:

This method blocks until a line is available.

Retrieves the next line from the buffer, blocking if necessary.

Returns:

  • (String, nil)

    The next line from the buffer or nil if no line is available.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'documented/common/sharedbuffer.rb', line 30

def gets
  thread_id = Thread.current.object_id
  if @buffer_index[thread_id].nil?
    @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
  end
  if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
    sleep 0.05 while ((@buffer_index[thread_id] - @buffer_offset) >= @buffer.length)
  end
  line = nil
  @buffer_mutex.synchronize {
    if @buffer_index[thread_id] < @buffer_offset
      @buffer_index[thread_id] = @buffer_offset
    end
    line = @buffer[@buffer_index[thread_id] - @buffer_offset]
  }
  @buffer_index[thread_id] += 1
  return line
end

#gets?String?

Retrieves the next line from the buffer without blocking.

Returns:

  • (String, nil)

    The next line from the buffer or nil if no line is available.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'documented/common/sharedbuffer.rb', line 51

def gets?
  thread_id = Thread.current.object_id
  if @buffer_index[thread_id].nil?
    @buffer_mutex.synchronize { @buffer_index[thread_id] = (@buffer_offset + @buffer.length) }
  end
  if (@buffer_index[thread_id] - @buffer_offset) >= @buffer.length
    return nil
  end

  line = nil
  @buffer_mutex.synchronize {
    if @buffer_index[thread_id] < @buffer_offset
      @buffer_index[thread_id] = @buffer_offset
    end
    line = @buffer[@buffer_index[thread_id] - @buffer_offset]
  }
  @buffer_index[thread_id] += 1
  return line
end

#rewindSharedBuffer

rubocop:disable Lint/HashCompareByIdentity Resets the buffer index for the current thread to the beginning of the buffer.

Returns:



97
98
99
100
# File 'documented/common/sharedbuffer.rb', line 97

def rewind
  @buffer_index[Thread.current.object_id] = @buffer_offset
  return self
end

#update(line) ⇒ SharedBuffer

rubocop:enable Lint/HashCompareByIdentity Adds a new line to the buffer, managing the size of the buffer.

Parameters:

  • line (String)

    The line to be added to the buffer.

Returns:



106
107
108
109
110
111
112
113
114
115
116
117
# File 'documented/common/sharedbuffer.rb', line 106

def update(line)
  @buffer_mutex.synchronize {
    fline = line.dup
    fline.freeze
    @buffer.push(fline)
    while (@buffer.length > @max_size)
      @buffer.shift
      @buffer_offset += 1
    end
  }
  return self
end