1. Channels
Channels are how Crystal communicates between Fibers. Crystal Channels are based on the CSP model (Communicating with Sequential Processes), which basically, the messages arrive are are handled strickly sequentially. The order in which messages by the receiving object are handled in the same order they enter the channel queue (FIFO - First-In First-Out).
We will use channels to communicate between Users listening concurrently for message arrivals.
1.1. Simple Channels (between Objects)
In the following code the User
creates a fiber that listens to new messages on its channel
to receive messages asynchronously. We channel.receive
(listen) for new messages within a spawn block to create a fiber
that loops waiting to receive and process new messages (basically making our own even loop listening for messages over a channel).
class User
getter channel : Channel(String)
private getter name : String, email : String
def initialize(@name, @email)
@channel = Channel(String).new # create a message channel that expects to receive a string
listen_for_messages # start listening for new incoming messages
end
def to_s
"#{name} <#{email}>"
end
# Creates a Fiber 'loop' waiting for messages to arrive
# crystal switches fibers after each lap through the loop
private def listen_for_messages
spawn do
loop do
message = channel.receive? # check for new messages
break if message.nil? # skip processing without a message (avoid an exception)
puts "To: #{to_s} -- #{message}"
end
end
end
end
To use the above code we will make two users and send each messages from Crystal’s main thread directly to their respective channels.
require "./user"
# USAGE
module Main
# create users
user_1 = SimpleUser.new(name: "first", email: "[email protected]")
user_2 = SimpleUser.new(name: "second", email: "[email protected]")
# send messages
puts "REAL-TIME - START"
# send an async message
spawn user_1.channel.send("ASYNC sent 1st")
# send a synchronous message
user_1.channel.send("REAL-TIME sent 2nd")
user_1.channel.send("REAL-TIME sent 3rd")
puts "SWITCH to user_2"
spawn user_2.channel.send("ASYNC sent 4th")
user_2.channel.send("REAL-TIME sent 5th")
puts "SWITCH back to user_1"
user_1.channel.send("REAL-TIME sent 6th")
spawn user_1.channel.send("ASYNC sent 7th")
puts "REAL-TIME - DONE"
# Allow Fibers (async messages) to execute
Fiber.yield
end
Run with:
$ crystal src/channel_basics/main.cr
1.2. Closing Channels
It would be nice to be able to wait and end the main thread when all the channels to ensure all messages get delivered. It can also be important for the Garbarge collector when using lots of objects. Objects that are not longer needed - won’t be garbage collected if their are still monitoring a channel for messages.
In order to close channels we can use object.channel.close
- to check if a channel is closed we can use: object.channel.closed?
require "./user"
# USAGE
module MainClose
# create users
user_1 = User.new(name: "first", email: "[email protected]")
user_2 = User.new(name: "second", email: "[email protected]")
# send messages
puts "REAL-TIME - START"
# send an async message
spawn user_1.channel.send("ASYNC sent 1st")
# send a synchronous message
user_1.channel.send("REAL-TIME sent 2nd")
user_1.channel.send("REAL-TIME sent 3rd")
puts "SWITCH to user_2"
spawn user_2.channel.send("ASYNC sent 4th")
user_2.channel.send("REAL-TIME sent 5th")
puts "SWITCH back to user_1"
user_1.channel.send("REAL-TIME sent 6th")
spawn user_1.channel.send("ASYNC sent 7th")
puts "REAL-TIME - DONE"
# immediate close / cleanup Channels
user_1.channel.close
user_2.channel.close
# async close / cleanup Channels
# spawn user_1.channel.close
# spawn user_2.channel.close
Fiber.yield
end
Run with:
$ crystal src/channel_basics/main_close.cr
Running this with immediate channel closing creates errors since closing the channel immediately means that messages in queued Fibers - waiting to deliver will suddenly loose their delivery channel.
A simple solution for this is to send the close asynchronously.
1.3. Waiting for Channels to close
How well does our code work when messaging lots of objects?
# src/channel_basics/main_close_wait.cr
require "./user"
module MainCloseWait
# make a large number of users
users = [] of User
1000.times do |i|
user = User.new(name: "user_#{i}", email: "user_#{i}@example.ch")
users << user
end
# send lots of messages - async (for some reason async needs to be first)
users.each do |receiver|
spawn receiver.channel.send("ASYNC -- From: #{receiver.to_s} - with channel")
end
# close user channels
users.each do |receiver|
# synchronous channel closing
receiver.channel.close
# close asynchronously to allow messages to be delivered
# spawn receiver.channel.close
end
# wait for all channels to close before allowing main to terminate
Fiber.yield
# loop do
# break if users.all?{ |u| u.channel.closed? } # are all channels are closed?
# Fiber.yield
# end
end
Run with:
$ crystal src/channel_basics/main_stress.cr
With the simple Fiber.yield
and a synchronous close - will won’t have a chance to get any messages before we exit.
So obviously, the close being synchronous and the message sending is async means we close our channels and end without sending messages. To fix this change our close loop to also be async:
# close user channels
users.each do |receiver|
# synchronous channel closing
# receiver.channel.close
# close asynchronously to allow messages to be delivered
spawn receiver.channel.close
end
This is a big improvement - now we see messages! BUT if we look closely, we are not getting all our messages - we actually need to wait for all our channels to close before ending. We can do this by now also changing our wait code to:
# wait for all channels to close before allowing main to terminate
# Fiber.yield
loop do
break if users.all?{ |u| u.channel.closed? } # are all channels are closed?
Fiber.yield
end
Now we have time to deliver ALL our messages before exiting!