8/03/2010

Setting Up a Queue Consumer with RabbitMQ's Erlang Client

Ok, so in this post I will outline how to set up a consumer using the rabbitmq erlang client.

We'll start with a basic chat server in which clients can post and listen for messages.
Continuing from my last post: http://developingthedream.blogspot.com/2009/10/rabbitmq-erlang-client-yay.html
You now have a basic project skeleton to start building your server.  We'll call the project chat_server.

I followed this blog post http://mutlix.blogspot.com/2007/10/amqp-in-10-mins-part3-flexible-routing.html for a visual representation of the AMQP specs,
although it's outdated the basics are the same.

So, the project would look like this:

chat_server/
chat_server/db
chat_server/deps
chat_server/ebin
chat_server/include
chat_server/src


Now inside the src/ directory, we'll create these files:
  
src/master.erl
src/chat_server.erl
src/test_client.erl



I made a mistake in my last blog post regarding how to properly symlink the rabbitmq dependencies.
Unlink any previously created symlinks:

unlink /usr/lib/erlang/lib/rabbitmq_common
unlink /usr/lib/erlang/lib/rabbitmq_erlang_client


Now create some symlinks in the deps folder:

cd deps/
ln -s rabbitmq-server/ rabbit_common
ln -s rabbitmq-erlang-client/ rabbitmq_erlang_client


Here's the content of the Makefile we'll be using.

.SUFFIXES: .erl .beam

.erl.beam:
    erlc -W $<

ARGS = -pa ebin \
       -pa deps/rabbit_common/ebin \
       -pa deps/rabbitmq_erlang_client/ebin \
       -boot start_sasl -s rabbit

ERL = erl ${ARGS}
ERLC = erlc -I . -I deps +debug_info -o ebin

MODS = src/master.erl src/chat_server.erl src/test_client.erl

all:
    ${ERLC} ${MODS}

run:
    ${ERL}

clean:
    rm -rf *.beam ebin/*.beam src/*.beam erl_crash.dump 

 
We'll start with the master.erl file which basically sets up the exchange, queues, etc.
Mainly bootstraps the server.
Here is the content of master.erl:

-module(master).
-include_lib("deps/rabbitmq_erlang_client/include/amqp_client.hrl").

-export([start/0]).

start() ->
    RID = amqp_connection:start_direct(),
    Channel = amqp_connection:open_channel(RID),

    X = <<"global_chat_exchange">>,
    Q = <<"global_message_queue">>,
    Key = <<"global_message_queue_publish_key">>,       %%our routing key, all clients have this

    amqp_channel:call(Channel, #'exchange.declare'{exchange = X, type = <<"topic">>, nowait = true}),
    amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
    amqp_channel:call(Channel, #'queue.bind'{queue = Q, exchange = X, routing_key = Key}),
  
    io:fwrite("bound queue: ~p to exchange: ~p using key: ~p~n", [Q, X, Key]),
  
    amqp_channel:close(Channel),
    amqp_connection:close(RID).

 
Here's the breakdown of the start() function.
I'm not doing any sort of error checking, take a look at deps/rabbitmq_erlang_client/test/test_util.erl
for more complete code.

First, we connect to rabbitmq.

RID = amqp_connection:start_direct(),

We simply call start_direct in the amqp_connection module and receive the connection id.  (this is just the id of an erlang process)
Next, we open a channel which is pretty straightforward.

Channel = amqp_connection:open_channel(Pid),

Afterward, we bind the queue to the exchange using a routing key.

So here's our names:

X = <<"global_chat_exchange">>,
Q = <<"global_message_queue">>,
Key = <<"mysecret">>,       %%our routing key, all clients have this

amqp_channel:call(Channel, #'exchange.declare'{exchange = X, type = <<"topic">>, nowait = true}),
amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
amqp_channel:call(Channel, #'queue.bind'{queue = Q, exchange = X, routing_key = RoutingKey}),

 
Here we tell the erlang rabbitmq client to declare the exchange.
You can look at the file: deps/rabbit_common/include/rabbit_framing.hrl for all the record definitions.
You can specify things such as the exchange type and other things.

Now, first we'll start by compiling this module manually to go over the erlang shell.
Run 'make run' to start the rabbitmq server.  You'll be greeted with the shell.
Compile the master module by typing:

c('src/master').

You can now run the code like so:

master:setup().

Now we're ready to start consuming messages.  So, the content of the first draft of chat_server.erl is:

-module(chat_server).
-include_lib("rabbitmq_erlang_client/include/amqp_client.hrl").
-compile(export_all).

start() ->
    RID = amqp_connection:start_direct(),
    Channel = amqp_connection:open_channel(RID),
   
    Queue = <<"global_message_queue">>,
  
    spawn( fun() -> consume_loop(RID, Channel, Queue) end ),
    self().
  
consume_loop(RID, Channel, Q) ->
    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()),
  
    receive
         #'basic.consume_ok'{} ->
            io:fwrite("subscribed to queue: ~p listening for messages...~n", [Q])
    end,
    receive
        {#'basic.deliver'{delivery_tag=Tag}, Content} ->
            amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
            handle_message(RID, Channel, Content),
            consume_loop(RID, Channel, Q);
        Unknown ->
            io:fwrite("unknown message: ~p tearing down~n", [Unknown]),
            teardown(RID, Channel)
    end.

handle_message(RID, Channel, Content) ->
    io:fwrite("got message: ~p from pid: ~p on channel: ~p ~n", [RID, Channel, Content]),
    todo.

teardown(RID, Channel) ->
    amqp_channel:close(Channel),
    amqp_connection:close(RID).

 
Here we open a channel and spawn a process to consume messages from the 'global_message_queue' queue.
You've seen the start of the code in master.erl.

Queue = <<"global_message_queue">>,   %%Here we declare the queue name to be passed around
spawn( fun() -> consume_loop(RID, Channel, Queue) end ),    %%spawn a process and return immediately
self().  %%return our process id (unused)

   
Here we spawn a process that loops forever consuming messages from the 'global_message_queue'

consume_loop(RID, Channel, Q) ->
    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()),
    receive
         #'basic.consume_ok'{} ->
            io:fwrite("subscribed to queue: ~p listening for messages...~n", [Q])
    end,

  
Here, we start the loop by telling the server that we're ready to consume on the queue 'Q' by calling amqp_channel:subscribe.
We immediately recieve a confirmation message and continue.

    receive
        {#'basic.deliver'{delivery_tag=Tag}, Content} ->


We wait for an erlang type message which contains a valid amqp message.
We save the payload to the variable 'Content' and the delivery tag to 'Tag'.

            amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

We immediately acknowledge that we've received the message.
You may wish to move this after you've processed the message.

         handle_message(RID, Channel, Content),
      consume_loop(RID, Channel, Q);
 
          
We send the content to the handle_message() function and then loop again to wait for messages again.     

        Unknown ->
            io:fwrite("unknown message: ~p tearing down~n", [Unknown]),
            teardown(RID, Channel)
    end.


If we get an unknown message for any reason we exit and tear down the channel and connection.
The handle_message() function simply prints the content of the message to the screen.

Now you can compile the module like so:

c('src/chat_server').

Start the consumer process by typing:

chat_server:start().

Now we can start publishing messages, this is the content of test_client.erl:

-module(test_client).
-include_lib("deps/rabbitmq_erlang_client/include/amqp_client.hrl").
-export([say/1]).
say(Msg) ->
    RID = amqp_connection:start_direct(),
    Channel = amqp_connection:open_channel(RID),
    X = <<"global_chat_exchange">>,
    Key = <<"global_message_queue_publish_key">>,
    
    Packet = list_to_binary(Msg),
    
    Publish = #'basic.publish'{exchange = X, routing_key = Key, mandatory=true, immediate=false},
    amqp_channel:call(Channel, Publish, #amqp_msg{payload = Packet}),
    
    amqp_channel:close(Channel),
    amqp_connection:close(RID).


First we turn the message into a binary for delivery:
    Packet = list_to_binary(Msg),

Here we simply turn the message to a binary.

    Publish = #'basic.publish'{exchange = X, routing_key = Key, mandatory=true, immediate=false},
    amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Packet}),
 
  
First, we declare a variable 'Publish' which sets up the publish options.
Then we call 'cast' instead of 'call' simply because we WANT to wait for the message to be published.
The reason we want to wait is because we immediately close the channel.

Now you can compile the module by running:

c('src/test_client').

And test it by running:

test_client:say("hello world!").

You should now see the output from your consumers which have read the message.

This concludes this introduction to using the rabbitmq-erlang client.
You can find the full test project available at: http://github.com/therevoltingx/chat_server

Any comments, questions, etc are welcomed!