1. Channel Buffers
Channel Buffers allow us to collect Channel Messages until they can be scheduled.
An unbuffered channel is instantiated with:
@channel = Channel(String).new
which is the equivalent of:
@channel = Channel(String).new(1)
A Message buffer of 1 (the channel holds at the MOST 1 message at a time.
To create a Channel with a buffer of 3 messages use:
@channel = Channel(String).new(3)
To demonstrate this lets overwhelm our message bus with the below code. To trigger a problem do the following in this order with an Unbuffered Channel. Send many: * many synchronous messages * many asynchronous messages * then close the channels
class User
getter channel : Channel(String)
private getter name : String, email : String
def initialize(@name, @email)
# @channel = Channel(String).new(3) # buffered - holds 3 messages
# @channel = Channel(String).new(1) # unbuffered - holds 1 message
@channel = Channel(String).new # unbuffered - holds 1 message
listen_for_messages
end
def to_s
"#{name} <#{email}>"
end
private def listen_for_messages
spawn do
loop do
message = channel.receive?
break if message.nil?
puts "To: #{to_s} -- #{message}"
end
end
end
end
module MainBuffers
# make a large number of users
users = [] of User
1000.times do |i|
user = User.new(name: "user_#{i}", email: "user_#{i}@example.ch")
users << user
end
# sending many sync and async ensures more messages than secondary fibers can handle without a buffer
users.each do |receiver|
# synchronous messaging
receiver.channel.send("SYNC 0 -- From: #{receiver.to_s} - with channel")
# # synchronous messaging
# spawn receiver.channel.send("SYNC 2 -- From: #{receiver.to_s} - with channel")
end
# sending many sync and async ensures more messages than secondary fibers can handle without a buffer
users.each do |receiver|
# async messaging
spawn receiver.channel.send("ASYNC 3 -- From: #{receiver.to_s} - with channel")
end
# close user channels
users.each do |receiver|
# close asynchronously to allow messages to be delivered
spawn receiver.channel.close
end
# wait for all channels to close before allowing main to terminate
loop do
break if users.all?{ |u| u.channel.closed? } # are all channels are closed?
Fiber.yield
end
end
Run with:
$ crystal src/channel_buffers/main_buffers.cr
Running this you will see many errors:
Unhandled exception in spawn: Channel is closed (Channel::ClosedError)
from /usr/local/Cellar/crystal/0.32.1/src/channel.cr:224:9 in 'send'
from src/channel_buffers/main_buffers.cr:50:11 in '->'
from /usr/local/Cellar/crystal/0.32.1/src/fiber.cr:255:3 in 'run'
from /usr/local/Cellar/crystal/0.32.1/src/fiber.cr:47:34 in '->'
We start to close the channels before the message bus is empty.
To easily fix this simply change the user code initializer to:
def initialize(@name, @email)
@channel = Channel(String).new(3)
listen_for_messages
end
Now rerun the code.
The errors should now be gone & we can successfully message many users synchronously and asynchronously.