1. Channel Buffers

Channel Buffers allow us to collect Channel Messages until they can be scheduled.

An unbuffered channel is instantiated with:

@channel = Channel(String).new

which is the equivalent of:

@channel = Channel(String).new(1)

A Message buffer of 1 (the channel holds at the MOST 1 message at a time.

To create a Channel with a buffer of 3 messages use:

@channel = Channel(String).new(3)

To demonstrate this lets overwhelm our message bus with the below code. To trigger a problem do the following in this order with an Unbuffered Channel. Send many: * many synchronous messages * many asynchronous messages * then close the channels

code/crystal/src/channel_buffers/main_buffers.cr
class User
  getter  channel : Channel(String)
  private getter name : String, email : String

  def initialize(@name, @email)
    # @channel = Channel(String).new(3)  # buffered - holds 3 messages
    # @channel = Channel(String).new(1)  # unbuffered - holds 1 message
    @channel = Channel(String).new       # unbuffered - holds 1 message
    listen_for_messages
  end

  def to_s
    "#{name} <#{email}>"
  end

  private def listen_for_messages
    spawn do
      loop do
        message = channel.receive?
        break     if message.nil?

        puts "To: #{to_s} -- #{message}"
      end
    end
  end
end

module MainBuffers
  # make a large number of users
  users  = [] of User
  1000.times do |i|
    user = User.new(name: "user_#{i}",  email: "user_#{i}@example.ch")
    users << user
  end

  # sending many sync and async ensures more messages than secondary fibers can handle without a buffer
  users.each do |receiver|
    # synchronous messaging
    receiver.channel.send("SYNC 0 -- From: #{receiver.to_s} - with channel")
    # # synchronous messaging
    # spawn receiver.channel.send("SYNC 2 -- From: #{receiver.to_s} - with channel")
  end

  # sending many sync and async ensures more messages than secondary fibers can handle without a buffer
  users.each do |receiver|
    # async messaging
    spawn receiver.channel.send("ASYNC 3 -- From: #{receiver.to_s} - with channel")
  end

  # close user channels
  users.each do |receiver|
    # close asynchronously to allow messages to be delivered
    spawn receiver.channel.close
  end

  # wait for all channels to close before allowing main to terminate
  loop do
    break if users.all?{ |u| u.channel.closed? } # are all channels are closed?
    Fiber.yield
  end
end

Run with:

$ crystal src/channel_buffers/main_buffers.cr

Running this you will see many errors:

Unhandled exception in spawn: Channel is closed (Channel::ClosedError)
  from /usr/local/Cellar/crystal/0.32.1/src/channel.cr:224:9 in 'send'
  from src/channel_buffers/main_buffers.cr:50:11 in '->'
  from /usr/local/Cellar/crystal/0.32.1/src/fiber.cr:255:3 in 'run'
  from /usr/local/Cellar/crystal/0.32.1/src/fiber.cr:47:34 in '->'

We start to close the channels before the message bus is empty.

To easily fix this simply change the user code initializer to:

def initialize(@name, @email)
  @channel = Channel(String).new(3)
  listen_for_messages
end

Now rerun the code.

The errors should now be gone & we can successfully message many users synchronously and asynchronously.