1. Channels

Channels are how Crystal communicates between Fibers. Crystal Channels are based on the CSP model (Communicating with Sequential Processes), which basically, the messages arrive are are handled strickly sequentially. The order in which messages by the receiving object are handled in the same order they enter the channel queue (FIFO - First-In First-Out).

We will use channels to communicate between Users listening concurrently for message arrivals.

1.1. Simple Channels (between Objects)

In the following code the User creates a fiber that listens to new messages on its channel to receive messages asynchronously. We channel.receive (listen) for new messages within a spawn block to create a fiber that loops waiting to receive and process new messages (basically making our own even loop listening for messages over a channel).

code/crystal/src/channel_basics/user.cr
class User
  getter channel : Channel(String)
  private getter name : String, email : String

  def initialize(@name, @email)
    @channel = Channel(String).new # create a message channel that expects to receive a string
    listen_for_messages            # start listening for new incoming messages
  end

  def to_s
    "#{name} <#{email}>"
  end

  # Creates a Fiber 'loop' waiting for messages to arrive
  # crystal switches fibers after each lap through the loop
  private def listen_for_messages
    spawn do
      loop do
        message = channel.receive? # check for new messages
        break if message.nil?      # skip processing without a message (avoid an exception)

        puts "To: #{to_s} -- #{message}"
      end
    end
  end
end

To use the above code we will make two users and send each messages from Crystal’s main thread directly to their respective channels.

code/crystal/src/channel_basics/main.cr
require "./user"

# USAGE
module Main
  # create users
  user_1 = SimpleUser.new(name: "first",  email: "[email protected]")
  user_2 = SimpleUser.new(name: "second", email: "[email protected]")

  # send messages
  puts "REAL-TIME - START"

  # send an async message
  spawn user_1.channel.send("ASYNC sent 1st")

  # send a synchronous message
  user_1.channel.send("REAL-TIME sent 2nd")
  user_1.channel.send("REAL-TIME sent 3rd")

  puts "SWITCH to user_2"
  spawn user_2.channel.send("ASYNC sent 4th")
  user_2.channel.send("REAL-TIME sent 5th")

  puts "SWITCH back to user_1"
  user_1.channel.send("REAL-TIME sent 6th")
  spawn user_1.channel.send("ASYNC sent 7th")
  puts "REAL-TIME - DONE"

  # Allow Fibers (async messages) to execute
  Fiber.yield
end

Run with:

$ crystal src/channel_basics/main.cr

1.2. Closing Channels

It would be nice to be able to wait and end the main thread when all the channels to ensure all messages get delivered. It can also be important for the Garbarge collector when using lots of objects. Objects that are not longer needed - won’t be garbage collected if their are still monitoring a channel for messages.

In order to close channels we can use object.channel.close - to check if a channel is closed we can use: object.channel.closed?

code/crystal/src/channel_basics/main_close.cr
require "./user"

# USAGE
module MainClose
  # create users
  user_1 = User.new(name: "first",  email: "[email protected]")
  user_2 = User.new(name: "second", email: "[email protected]")

  # send messages
  puts "REAL-TIME - START"

  # send an async message
  spawn user_1.channel.send("ASYNC sent 1st")

  # send a synchronous message
  user_1.channel.send("REAL-TIME sent 2nd")
  user_1.channel.send("REAL-TIME sent 3rd")

  puts "SWITCH to user_2"
  spawn user_2.channel.send("ASYNC sent 4th")
  user_2.channel.send("REAL-TIME sent 5th")

  puts "SWITCH back to user_1"
  user_1.channel.send("REAL-TIME sent 6th")
  spawn user_1.channel.send("ASYNC sent 7th")
  puts "REAL-TIME - DONE"

  # immediate close / cleanup Channels
  user_1.channel.close
  user_2.channel.close

  # async close / cleanup Channels
  # spawn user_1.channel.close
  # spawn user_2.channel.close

  Fiber.yield
end

Run with:

$ crystal src/channel_basics/main_close.cr

Running this with immediate channel closing creates errors since closing the channel immediately means that messages in queued Fibers - waiting to deliver will suddenly loose their delivery channel.

A simple solution for this is to send the close asynchronously.

1.3. Waiting for Channels to close

How well does our code work when messaging lots of objects?

code/crystal/src/channel_basics/main_stress.cr
# src/channel_basics/main_close_wait.cr
require "./user"

module MainCloseWait

  # make a large number of users
  users  = [] of User
  1000.times do |i|
    user = User.new(name: "user_#{i}",  email: "user_#{i}@example.ch")
    users << user
  end

  # send lots of messages - async (for some reason async needs to be first)
  users.each do |receiver|
    spawn receiver.channel.send("ASYNC -- From: #{receiver.to_s} - with channel")
  end

  # close user channels
  users.each do |receiver|
    # synchronous channel closing
    receiver.channel.close

    # close asynchronously to allow messages to be delivered
    # spawn receiver.channel.close
  end

  # wait for all channels to close before allowing main to terminate
  Fiber.yield

  # loop do
  #   break if users.all?{ |u| u.channel.closed? } # are all channels are closed?
  #   Fiber.yield
  # end
end

Run with:

$ crystal src/channel_basics/main_stress.cr

With the simple Fiber.yield and a synchronous close - will won’t have a chance to get any messages before we exit.

So obviously, the close being synchronous and the message sending is async means we close our channels and end without sending messages. To fix this change our close loop to also be async:

  # close user channels
  users.each do |receiver|
    # synchronous channel closing
    # receiver.channel.close

    # close asynchronously to allow messages to be delivered
    spawn receiver.channel.close
  end

This is a big improvement - now we see messages! BUT if we look closely, we are not getting all our messages - we actually need to wait for all our channels to close before ending. We can do this by now also changing our wait code to:

# wait for all channels to close before allowing main to terminate
# Fiber.yield

loop do
  break if users.all?{ |u| u.channel.closed? } # are all channels are closed?
  Fiber.yield
end

Now we have time to deliver ALL our messages before exiting!