Module: Lich::Common::Buffer

Defined in:
documented/common/buffer.rb

Overview

Manages a buffer for handling streams in a thread-safe manner.

Examples:

Using the Buffer module

Lich::Common::Buffer.update(line)
Lich::Common::Buffer.gets

Constant Summary collapse

DOWNSTREAM_STRIPPED =

Constant representing the stripped downstream stream.

1
DOWNSTREAM_RAW =

Constant representing the raw downstream stream.

2
DOWNSTREAM_MOD =

Constant representing the modified downstream stream.

4
UPSTREAM =

Constant representing the upstream stream.

8
UPSTREAM_MOD =

Constant representing the modified upstream stream.

16
SCRIPT_OUTPUT =

Constant representing the script output stream.

32
@@index =
Hash.new
@@streams =
Hash.new
@@mutex =
Mutex.new
@@offset =
0
@@buffer =
Array.new
@@max_size =
3000

Class Method Summary collapse

Class Method Details

.cleanupBuffer

rubocop:enable Lint/HashCompareByIdentity Cleans up the buffer by removing entries for threads that no longer exist.

Returns:

  • (Buffer)

    The Buffer instance for method chaining.



168
169
170
171
172
# File 'documented/common/buffer.rb', line 168

def Buffer.cleanup
  @@index.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } }
  @@streams.delete_if { |k, _v| not Thread.list.any? { |t| t.object_id == k } }
  return self
end

.clearArray<Line>

Clears the buffer for the current thread and returns all lines.

Returns:

  • (Array<Line>)

    An array of lines that were in the buffer.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'documented/common/buffer.rb', line 100

def Buffer.clear
  thread_id = Thread.current.object_id
  if @@index[thread_id].nil?
    @@mutex.synchronize {
      @@index[thread_id] = (@@offset + @@buffer.length)
      @@streams[thread_id] ||= DOWNSTREAM_STRIPPED
    }
  end
  lines = Array.new
  loop {
    if (@@index[thread_id] - @@offset) >= @@buffer.length
      return lines
    end

    line = nil
    @@mutex.synchronize {
      if @@index[thread_id] < @@offset
        @@index[thread_id] = @@offset
      end
      line = @@buffer[@@index[thread_id] - @@offset]
    }
    @@index[thread_id] += 1
    lines.push(line) if ((line.stream & @@streams[thread_id]) != 0)
  }
  return lines
end

.getsLine

Note:

This method blocks until a line is available.

Retrieves the next line from the buffer in a thread-safe manner.

Returns:

  • (Line)

    The next line from the buffer.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'documented/common/buffer.rb', line 36

def Buffer.gets
  thread_id = Thread.current.object_id
  if @@index[thread_id].nil?
    @@mutex.synchronize {
      @@index[thread_id] = (@@offset + @@buffer.length)
      @@streams[thread_id] ||= DOWNSTREAM_STRIPPED
    }
  end
  line = nil
  loop {
    if (@@index[thread_id] - @@offset) >= @@buffer.length
      sleep 0.05 while ((@@index[thread_id] - @@offset) >= @@buffer.length)
    end
    @@mutex.synchronize {
      if @@index[thread_id] < @@offset
        @@index[thread_id] = @@offset
      end
      line = @@buffer[@@index[thread_id] - @@offset]
    }
    @@index[thread_id] += 1
    break if ((line.stream & @@streams[thread_id]) != 0)
  }
  return line
end

.gets?Line?

Retrieves the next line from the buffer if available, non-blocking.

Returns:

  • (Line, nil)

    The next line from the buffer or nil if none is available.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'documented/common/buffer.rb', line 63

def Buffer.gets?
  thread_id = Thread.current.object_id
  if @@index[thread_id].nil?
    @@mutex.synchronize {
      @@index[thread_id] = (@@offset + @@buffer.length)
      @@streams[thread_id] ||= DOWNSTREAM_STRIPPED
    }
  end
  line = nil
  loop {
    if (@@index[thread_id] - @@offset) >= @@buffer.length
      return nil
    end

    @@mutex.synchronize {
      if @@index[thread_id] < @@offset
        @@index[thread_id] = @@offset
      end
      line = @@buffer[@@index[thread_id] - @@offset]
    }
    @@index[thread_id] += 1
    break if ((line.stream & @@streams[thread_id]) != 0)
  }
  return line
end

.rewindBuffer

Resets the buffer index for the current thread.

Returns:

  • (Buffer)

    The Buffer instance for method chaining.



91
92
93
94
95
96
# File 'documented/common/buffer.rb', line 91

def Buffer.rewind
  thread_id = Thread.current.object_id
  @@index[thread_id] = @@offset
  @@streams[thread_id] ||= DOWNSTREAM_STRIPPED
  return self
end

.streamsInteger

rubocop:disable Lint/HashCompareByIdentity Retrieves the current stream for the calling thread.

Returns:

  • (Integer)

    The current stream identifier.



150
151
152
# File 'documented/common/buffer.rb', line 150

def Buffer.streams
  @@streams[Thread.current.object_id]
end

.streams=(val) ⇒ nil

Sets the stream for the calling thread.

Parameters:

  • val (Integer)

    The stream identifier to set.

Returns:

  • (nil)

    Returns nil if the value is invalid.



157
158
159
160
161
162
163
# File 'documented/common/buffer.rb', line 157

def Buffer.streams=(val)
  if (!val.is_a?(Integer)) or ((val & 63) == 0)
    respond "--- Lich: error: invalid streams value\n\t#{$!.caller[0..2].join("\n\t")}"
    return nil
  end
  @@streams[Thread.current.object_id] = val
end

.update(line, stream = nil) ⇒ Buffer

Updates the buffer with a new line, managing the buffer size.

Parameters:

  • line (Line)

    The line to add to the buffer.

  • stream (Integer, nil) (defaults to: nil)

    Optional stream identifier.

Returns:

  • (Buffer)

    The Buffer instance for method chaining.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'documented/common/buffer.rb', line 131

def Buffer.update(line, stream = nil)
  @@mutex.synchronize {
    frozen_line = line.dup
    unless stream.nil?
      frozen_line.stream = stream
    end
    frozen_line.freeze
    @@buffer.push(frozen_line)
    while (@@buffer.length > @@max_size)
      @@buffer.shift
      @@offset += 1
    end
  }
  return self
end