JBoss.orgCommunity Documentation

Chapter 8. TorqueBox Messaging

8.1. Introduction
8.2. Deploying Destinations
8.2.1. Deployment Styles
8.2.2. Deployment Descriptors
8.3. TorqueBox Ruby Classes
8.4. Messaging Abstractions
8.4.1. Queues and Topics
8.4.2. Publishing Messages
8.4.3. Receiving Messages
8.4.4. Destination management
8.4.5. Listing and looking up destinations
8.4.6. Synchronous Messaging
8.4.7. Message Encodings
8.4.8. Message Processors
8.4.9. Remote Message Processors
8.4.10. Backgroundable Methods
8.4.11. Future Objects
HornetQ

TorqueBox integrates the JBoss HornetQ message broker technology. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, to allow for load-balancing, failover, and other advanced deployments.

The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. From an implementation perspective, a broker mediates the relationships between the other actors.

Messages

The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.

Destinations

A destination represents a rendezvous where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.

There are two main types of destinations: queues and topics. All destinations allow multiple actors to place messages with them. The type of destination affects what happens to the message once given to the destination. A queue delivers the message to a single recipient (possibly one of many candidate recipients). A topic delivers the message to multiple interested recipients.

In the image below, the green lines represent the flow of a single message from a producer to one-or-more consumers through a topic and a queue.

Producers

Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination.

Consumers

Any component that waits for messages to be delivered to it by the message broker is consider a consumer. A consumer is unaware of the producer and any other consumers, potentially.

Queues and topics (collectively known as destinations) may be deployed with your application, or separate from your application. Various parts of your application may also implicitly deploy and use some destinations.

Each method has advantages and disadvantages involving the expectations of your application and its interaction with resources outside the scope of the application.

You have several options when deploying queues and topics, based on the lifecycle that suits your systems best.

If your queues and topics have a lifecycle that extends beyond the deployment of your applications, you may want long-lived destinations, which are first-order components, and may be deployed on their own. In this way, many applications can come and go over time, publishing and consuming from the same queues.

When using long-lived destinations, *-knob.yml deployment descriptors are placed directly into the deployments/ directory of TorqueBox AS. Note that a corresponding *-knob.yml.dodeploy file is also required in this deployments/ directory. Simply touch/create a blank file here as part of your deployment.

Queues

To deploy queues, a simple YAML file is required to name the queues and provide additional configuration parameters. The file should have the suffix of -knob.yml, such as these-queues-knob.yml or those-queues-knob.yml. The only configuration option available on queues is the durable option.

Durability is enabled by default, and involves writing each message to disk so as to be able to recover in the event of failure or server restart. Disabling durability on queues may result in better performance, but increases the risk of losing messages.


Topics

To deploy topics, a simple YAML file is required to name the topics and provide additional configuration parameters. The file should have the suffix of -knob.yml, such as these-topics-knob.yml or those-topics-knob.yml. Currently, no additional configuration parameters are allowed - topic durability is controlled via options on the attached processors (See Section 8.4.8.3, “Connecting Consumers to Destinations”).


All classes in the TorqueBox::Messaging module reside in the Ruby gem, torquebox-messaging, so to use them in your Rails app, you'll need to configure your app to load the gem.

Rails 2.x

Add this to your config/environment.rb:


Rails 3.x

Add this to your Gemfile:


And to use them in any other JRuby script, it's even simpler. First, ensure that rubygems is loaded, then require the torquebox-messaging feature.


It's trivial to publish a message to a JMS Queue or Topic with TorqueBox. And if all of your message consumers are Ruby clients, the contents of the messages can be any serializable Ruby or Java object. You just need to ensure that the type of content you produce resides in the runtime environments of both the producer and the consumer.

To send a message, you will need access to a Topic or Queue instance. The preferred method for accessing the destination instance is to use TorqueBox.fetch(...) (see Messaging Destinations for more details). If you need to pass options to the instance, or only have access to the destination name at runtime, construct either a Topic or a Queue instance with its name and options. Once you have a destination instance, simply call its publish method. The API's of both Topics and Queues are identical; they each simply represent a destination for your messages.

By default, messages are encoded using Ruby's Marshal serialization mechanism, allowing you to include Ruby objects in your message. If you need to produce messages that will be consumed by non-Ruby or TorqueBox 1.x clients, you can override the encoding mechanism globally or on a per publish basis. See Section 8.4.7, “Message Encodings” for more information.




The publish method takes an optional second argument containing a hash of options:

Table 8.2. Publish options

OptionDefaultDescription
:encoding:marshalSpecifies the serialization encoding to use for the message. TorqueBox provides the following built-in encodings:
  • :marshal - The message is encoded/decoded via Marshal, and is transmitted as a binary message.

  • :marshal_base64 - The message is encoded/decoded via Marshal, and is transmitted as a base64 encoded text message. This was the encoding scheme used in TorqueBox 1.x.

  • :json - The message is encoded/decoded via JSON, and is transmitted as a text message. This encoding is limited, and should only be used for simple messages.

  • :edn - The message is encoded/decoded via the edn gem, and is transmitted as a text message. This encoding is most convenient for messages that can be represented using standard Clojure data structures.

  • :text - The message isn't encoded/decoded at all, and is passed straight through as a text message. The content of the message must be a string.

See Section 8.4.7, “Message Encodings” for more information.
:priority:normalhigher priority messages will be delivered before lower priority messages within the context of a queue. You can specify the priority as an integer in the range 0..9, or as one of the following convenience symbols (with the corresponding integer priorities in parentheses):
  • :low (1)

  • :normal (4)

  • :high (7)

  • :critical (9)

Higher priority messages will be processed before lower priority ones for a specific message processor.
:ttl0The maximum time the message will wait in a destination to be consumed, in milliseconds. If the message isn't consumed within this time it will be delivered to an expiry queue. By default, messages don't have a ttl (and therefore never expire). By default, expired messages end up on the /queue/ExpiryQueue queue. If you want to do something special with those messages, you'll need to add a processor for that queue.
:schedulednilBy default a message will be delivered to the queue or topic immediately. You can delay the delivery by using the :scheduled. The specified value must be a Time object containing the time it should be delivered. Please note that scheduled messages will be delivered only to the consumers connected at the time of publishing the message.
:txtrueBy default, messages published within the scope of a transaction will not be delivered until that transaction commits. Set to false to override.
:persistenttrueBy default, queued messages will survive across AS restarts. If you don't want a message to be persistent, set the persistence to false (see Section 8.2.2.2, “Long-lived queues and topics for controlling message durability globally for a queue).
:correlation_idnilThe string value to set for the JMSCorrelationID message header.
:reply_tonilThe javax.jms.Destination value to set for the JMSReplyTo message header.
:typenilThe string value to set for the JMSType message header.
:propertiesnilA hash of string key/value pairs to set as message properties. This can be used as application-specific headers and matched against in the :selector option of the receive method.
:startup_timeout30000The maximum time to wait for the destination to become ready on initial app startup, in milliseconds. On a very slow machine this may need to be increased from the default.

Receiving messages from a JMS Queue or Topic is very similar to publishing messages. To consume a message, simply construct either a Queue or Topic instance with its name, and then call its receive method. The API's of both Topics and Queues are identical.


The receive method takes an optional hash of options, described below. It can also take an optional block, to which the received message will be yielded. If a block is supplied, receive returns the value of the block. If the block tosses an exception, the broker will consider the message undelivered and will automatically retry delivery up to some configurable limit [10].


TorqueBox offers a simple way to manage queues and topics.

There are several methods available in the TorqueBox::Messaging::Queue to manage the current instance of the queue.


Table 8.4. Queue instance management options

MethodParamsDescription
stopNone Stops and destroys the queue.
paused?None Returns true if queue is pause, false otherwise.
pauseNone Pauses the queue. Messages put into a queue will not be delivered even if there are connected consumers.
resumeNone Resumes the queue.
remove_messagesfilter Removes messages from the queue. The optional filter parameter allows to remove only selected messages by filtering them by the properties with a SQL92-like syntax. If no filter is set, all messages will be removed. This method returns the count of removed messages from the queue.
count_messagesfilter, destination Counts messages in the queue. The optional filter parameter allows to count only selected messages by filtering them by the properties with a SQL92-like syntax. If no filter is set, all messages will be counted.
move_messagesqueue_name, filter, reject_duplicates Moves messages from the queue to another queue. The optional filter parameter allows to count only selected messages by filtering them by the properties with a SQL92-like syntax. If no filter is set, all messages will be moved. If reject_duplicates parameter is set to true it'll reject all duplicates while moving to the other queue.
move_messagequeue_name, id, reject_duplicates Moves the selected message (you can get the id like this: message.jms_message.jms_message_id) from the queue to another queue. If reject_duplicates parameter is set to true it'll reject all duplicates while moving to the other queue.
expire_messagesfilter Expires messages from the queue and moves them to the expire address (by default jms.queue.ExpiryQueue queue). You can set custom expiry address by using the expiry_address= method. The optional filter parameter allows to expire only selected messages by filtering them by the properties with a SQL92-like syntax. If no filter is set, all messages will be moved.
expire_messageid Expires the selected message (you can get the id like this: message.jms_message.jms_message_id) and moves it to the expire address (by default jms.queue.ExpiryQueue queue). You can set custom expiry address by using the expiry_address= method.
send_messages_to_dead_letter_addressfilter Sends messages from the queue to the dead letter address (by default jms.queue.DLQ queue). You can set custom dead letter address by using the dead_letter_address= method. The optional filter parameter allows to send only selected messages by filtering them by the properties with a SQL92-like syntax. If no filter is set, all messages will be moved.
send_message_to_dead_letter_addressid Expires the selected message (you can get the id like this: message.jms_message.jms_message_id) and moves it to the dead letter address (by default jms.queue.DLQ queue). You can set custom dead letter address by using the dead_letter_address= method.
expiry_address  Using this method you can get or set the expiry address for the current queue. By default it's set to jms.queue.ExpiryQueue queue. Make sure you prefix your destination name with jms.queue. or jms.topic. depending on the type.
dead_letter_address  Using this method you can get or set the dead letter address for the current queue. By default it's set to jms.queue.DLQ queue. Make sure you prefix your destination name with jms.queue. or jms.topic. depending on the type.


TorqueBox provides several different encoding serialization schemes for messaging, and allows you to override the default encoding for all of your messages, or override the encoding used on a per publish basis. Creating and registering your own encoding is trivial if you need an encoding scheme that is not provided out of the box.

TorqueBox provides the following built-in encodings:

You can specify the encoding on a per-publish basis (see Section 8.4.2, “Publishing Messages”), or set the default encoding globally (see Section 8.4.7.2, “Overriding The Default Encoding”).

Message consumers may be implemented in Ruby and easily attached to destinations. A Ruby consumer may either interact at the lowest JMS-level, or take advantage of higher-level semantics.

Message consumers may extend TorqueBox::Messaging::MessageProcessor and implement an on_message(body) method which will receive the body of the JMS message.

Example 8.20. MessageProcessor subclass

class MyConsumer < TorqueBox::Messaging::MessageProcessor
  def on_message(body)
    # The body will be of whatever type was published by the Producer
    # the entire JMS message is available as a member variable called message()
  end
  def on_error(exception)
    # You may optionally override this to interrogate the exception. If you do, 
    # you're responsible for re-raising it to force a retry.
  end
end

There is an accessor for the actual JMS message that is set by TorqueBox prior to invoking on_message, so it's there if you need it.


Just like with process!, if on_message raises an exception, the message broker considers the message undelivered. You may trap the error by overriding on_error, at which point you decide whether to re-raise the exception to force a retry. That is the default behavior if you do not override the method.

To connect consumers within a TorqueBox-deployed application, you need to add a messaging: section to your torquebox.yml (or external *-knob.yml descriptor), or add a processor directive to the destination definition if you are using the DSL (in torquebox.rb).

If you are using a YAML descriptor, the messaging: section will contain the mappings from your destinations (topics and queues) to your consumers. The section is a YAML hash, the keys of which are your destination names, which should correspond to existing queues and topics. These destinations may be deployed through the same torquebox.yml or as long-lived destinations.

If you are using a DSL descriptor, the consumers are not defined in a separate section, but as part of the queue/topic definition. If the destination is a long-lived destination (managed by another application), then you will need to tell TorqueBox not to try to create the destination by setting the create to false.


The above example shows the simplest possible configuration, but it's possible to alter the behavior of your message processor using the following options:

Table 8.6. Message processor options

OptionDefaultDescription
concurrency1May be used to throttle the throughput of your processor. Processors are single-threaded, by default, but you can increase this value to match the number of concurrent messages you want to handle at a time per server. Note that this value determines the number of consumers connected to the destination and thus you'll rarely want a concurrency greater than 1 for topics since that means you'll process duplicate messages. For queues you'll often want this value higher than 1 so you can process messages in parallel.
singletonfalseBy default, message processors run on all nodes in a cluster, enabling automatic load balancing. Setting this to true results in a single, HA processor, ensuring that only one node in a cluster receives all messages from its associated destination and, with concurrency set to 1, in the same order they were published.
selector May be used to filter the messages dispatched to your consumer.
durablefalseTurns the processor into a durable subscriber. Once a processor durably subscribes to a topic, if it disconnects any messages sent will be saved and delivered once the processor reconnects. If true, you must also supply a client_id as well. This setting only affects processors attached to topics, and is ignored for queue processors.
client_id A string to uniquely indentify the connecting client. Optional unless you are using the durable option (above) on a Topic.
xafalseBy default, message processors do not initiate a distributed transaction. Setting this to true will automatically enlist the message processor's receipt of the message and any messaging, cache, or database access inside that processor in single transaction.
config Should contain a hash of data which will be passed to your consumer's constructor, initialize(Hash).
stoppedfalseBy default, message processors are started immediately with the application. You can change this behavior and set the parameter to true then later use the management methods of message processors like start or stop.


The YAML and DSL syntaxes enable the configuration to get fairly sophisticated, allowing you to, for example, map a single destination to multiple processors or re-use configuration options in multiple processors. You may never have a need for this much flexibility, but it's available if you do.

Example 8.23. Advanced messaging configuration in a deployment descriptor

Using the YAML syntax:

application:
  ...

messaging:
  /topics/simple: SimpleHandler

  /topics/popular:
    - Handler
        concurrency: 5
    - Observer: &defaults
        selector: "x > 18"
        config:
          x: ex
          y: why
    - Processor

/queues/students:
    VerySimpleAnalyzer:
    YouthMonitor:
      selector: "y < 18"
      config:
        h: ache
        i: eye
    LookAndFeel:
      <<: *defaults

Here we have /topics/simple mapped to a single processor of type SimpleHandler using a YAML string, /topics/popular mapped to three processors (Handler, Observer, Processor) using a YAML list, and /queues/students mapped to three more processors (VerySimpleAnalyzer, YouthMonitor, LookAndFeel) using a YAML hash where each key in the hash corresponds to the processor type. This example also takes advantage of YAML's ability to merge hash's: the Observer and LookAndFeel processors are configured identically.

And via the DSL:

TorqueBox.configure do
  ...
  topic '/topics/simple' do
    processor SimpleHandler
  end

  common_config = { :selector => "x > 18", :config => { :x => 'ex', :y => 'why' } }

  topic '/topics/popular' do |topic| 
    topic.processor Handler, :concurrency => 5
    topic.processor Observer, common_config
    topic.processor Processor
  end

  queue '/queues/students' do |queue|
    queue.processor VerySimpleAnalyzer
    queue.processor YouthMonitor do 
      selector "y < 18"
      config do
        h 'ache'
        i 'eye'
      end
    end
    queue.processor LookAndFeel, common_config
  end
end

Here we have the same configuration as the YAML example above, but expressed via the DSL. Note that we have to use the block argument form for our destinations that share common_config. This is due to the no-argument form using instance_eval, which does not allow you to access any variables defined outside of the block.


TorqueBox makes it easy to list and lookup any particular message processor (including Backgroudables). There are two class methods available in the TorqueBox::Messaging::MessageProcessor class: list and lookup.

Both methods return instances of TorqueBox::Messaging::MessageProcessorProxy class which is a proxy for accessing the particular message processor service configuration. The class exposes many methods, like: concurrency, name, start, stop. For more information please refer to RDocs for messaging gem.

The list method allows to list all message processors deployed with the application. It returns an array of TorqueBox::Messaging::MessageProcessorProxy objects. If no message processors are available, empty array is returned.


The lookup method allows to lookup a message processor by providing destination and class name (as String) of the implementation. If a particular message processor is available it returns a TorqueBox::Messaging::MessageProcessorProxy object. If there is no message processor found, nil is returned.


It is possible to attach a message processor to a remote destination. Such message processors will behave just as a regular message processors with the difference that the queue (or topic) is deployed on a remote host. No changes are required to the message processors itself, but we need to configure the destination and specify where it is located and how to connect to it. Attaching message processors to remote destinations is done in the same way as with local destinations.


TorqueBox makes it easy to add the destination to the exported context. The only thing you need to do is to set the exported parameter for selected destination, like this:


This simple configuration will make the selected queue visible for both remote and local JNDI lookups.

By default security configuration of JBoss AS does not allow to connect to the host and lookup objects in the JNDI tree remotely. You can make it possible by creating a user and setting a password for it. You can use the convenient $JBOSS_HOME/bin/add-user.sh script and add an Application User, like this:

$ add-user.sh

What type of user do you wish to add?
 a) Management User (mgmt-users.properties)
 b) Application User (application-users.properties)
(a): b

Enter the details of the new user to add.
Realm (ApplicationRealm) :
Username : remoteuser
Password :
Re-enter Password :
What roles do you want this user to belong to? (Please enter a comma separated list, or leave blank for none)[  ]: remote
About to add user 'remoteuser' for realm 'ApplicationRealm'
Is this correct yes/no? yes
Added user 'remoteuser' to file '/work/torquebox/jboss/standalone/configuration/application-users.properties'
Added user 'remoteuser' to file '/work/torquebox/jboss/domain/configuration/application-users.properties'
Added user 'remoteuser' with roles remote to file '/work/torquebox/jboss/standalone/configuration/application-roles.properties'
Added user 'remoteuser' with roles remote to file '/work/torquebox/jboss/domain/configuration/application-roles.properties'
Is this new user going to be used for one AS process to connect to another AS process?
e.g. for a slave host controller connecting to the master or for a Remoting connection for server to server EJB calls.
yes/no? yes
To represent the user add the following to the server-identities definition <secret value="OBNEMSIzIUA=" />

Now you can put the credentials into the deployment descriptor as shown above.

TorqueBox also provides Backgroundable methods. Backgroundable allows you to process any method on any class or object asynchronously. You can mark a method to always execute in the background, or send a method to the background on an ad hoc basis. Backgrounded methods return a Future object that can be used to monitor the status of the method invocation and retrieve the final return value. When transitioning from TorqueBox 1.x to 2.x, it is advisable to replace any Task implementation with usage of Backgroundable.

Backgroundable provides the always_background class method that allows you to flag a class or instance method to always be executed in the background:


You can also call always_background from outside of the class definition if you prefer:


All of the options available to message processors in a deployment descriptor are available to Backgroundable message processors, too, though possibly only the concurrency option is applicable. In addition to the processor options, you can also specify the durability of the underlying queue. This durable option is different than the durable option for topic processors, and instead controls the durability of the underlying queue. It is true by default.


By default, every application you deploy will have a queue for Backgroundable methods, even if you don't use it. To turn off the queue, set the concurrency to 0.

Methods backgrounded via Backgroundable return Future objects that allow you to monitor the progress of the asynchronous processing.

Table 8.7. Future instance methods

MethodDescription
started?Returns true if the task processing has started.
complete?Returns true if the task processing has completed without error. If true, The result is available via the result method.
error?Returns true if an error occurred during the task processing. If true, The actual error is available via the error method.
statusReturns the last status message returned from the task. This will only have meaning if you signal status information from within your task. See the status notifications section for more details.
status_changed?Returns true if the status has changed since you last called status. This will only have meaning if you signal status information from within your task. See the status notifications section for more details.
all_statusesReturns an array of all the statuses received by the future, which may not include all of the statuses sent if the task completes before they are all received. This will only have meaning if you signal status information from within your task. See the status notifications section for more details.
resultReturns the result of the remote processing. This method takes a timeout (in milliseconds), and will block for that amount of time if processing has started but not completed, or up to twice that time if processing has yet to start. If no result is available after timing out, a TorqueBox::Messaging::TimeoutException is raised. The timeout defaults to 30 seconds. The recommended pattern is to wait for complete? to return true before calling result.
method_missingDelegates any missing methods to the result, using the default timeout.
errorReturns the remote error object if an error occurred during task processing.