1. Callbacks (Async Responses)

So far we have focused on "Tell" don’t "Ask" messaging (East-Oriented). This is a good focus, but sometimes its helpful to know the state of other objects (Sharing is Caring ;).

For example in our User messaging examples - what if we were connected to a chat-room and not messaging other users directly. To avoid lots of complicated logic to avoid runtime errors if channels are closed (like we saw before in the Channel Buffering), we as a chat room need to know if users are in the chatroom or not.

We can do this with a notification channel (a sort-of callback) to the chatroom when we leave - and we can keep the code focused on Tell don’t Ask using channels with our callback.

code/crystal/src/channel_callbacks/user.cr
class User
  getter         message_channel : Channel(String)

  private getter name, email, departure_channel

  # notice 'Nil' as a class/constant not a value in the Definition
  def initialize(@name : String, @email : String,
                 @departure_channel : Channel(Nil))  # (1)
    @message_channel = Channel(String).new(3)
    listen_for_messages
  end

  def to_s
    "#{name} <#{email}>"
  end

  private def listen_for_messages
    spawn do
      loop do
        message = message_channel.receive?
        break     if message.nil?

        puts "To: #{to_s} -- #{message}"
      end
      puts "#{to_s} -- CLOSED"
      # notify when loop ends -- channel is closed
      # notice 'nil' as the value when sending
      departure_channel.send(nil)  # (2)
    end
  end
end
  1. Inject Main’s 'channel' into our class to callback on when User’s message_channel closes.

  2. AFTER User’s message_channel closes - notify Main of closing (main isn’t tracking users who are closed - just counting that all are closed — so send 'nil' is a good choice)

We can use our new channel callback in the following way:

code/crystal/src/channel_callbacks/main.cr
require "./user"

module Main
  # make callback channel for users
  # notice 'Nil' as a class/constant not a value
  user_closed = Channel(Nil).new

  # make a large number of users
  users  = [] of User

  100.times do |i|
    # all users get given the same channel to notify of closure
    user = User.new(name: "user_#{i}", email: "user_#{i}@example.ch",
                    departure_channel: user_closed)
    users << user
  end

  # send lots of messages - async (for some reason async needs to be first)
  users.each do |receiver|
    receiver.message_channel.send("SYNC 0 -- From: #{receiver.to_s}")
    spawn receiver.message_channel.send("ASYNC 1 -- From: #{receiver.to_s}")
  end

  # channels allow chaining too!
  users.each do |receiver|
    receiver.message_channel.send("SYNC 3 -- From: #{receiver.to_s}")
    spawn receiver.message_channel.send("ASYNC 4 -- From: #{receiver.to_s}")
                                  .send("ASYNC 5 -- close is next")
                                  .close
  end

  puts "LISTEN for CLOSED CHANNELS before closing"
  # listen and wait for all users to close
  (users.size).times { user_closed.receive }
  puts "ALL users CLOSED - safe to end"
end

Run with:

$ crystal src/channel_callbacks/main.cr

Now we can wait for all channels to close using our callback.

NOTE
  • Channels allow a fluid interface (chaining)!

  • Message arrive in the order they are RECEIVED - Async messages are sent when scheduled (later) - thus Sync messages arrive first