JBoss.orgCommunity Documentation

Chapter 8. TorqueBox Messaging

8.1. Introduction
8.2. Deploying Destinations
8.2.1. Deployment Styles
8.2.2. Deployment Descriptors
8.3. TorqueBox Ruby Classes
8.4. Messaging Abstractions
8.4.1. Queues and Topics
8.4.2. Synchronous Messaging
8.4.3. Message Encodings
8.4.4. Message Processors
8.4.5. Backgroundable Methods
8.4.6. Future Objects
HornetQ

TorqueBox integrates the JBoss HornetQ message broker technology. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, to allow for load-balancing, failover, and other advanced deployments.

The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. From an implementation perspective, a broker mediates the relationships between the other actors.

Messages

The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.

Destinations

A destination represents a rendezvous where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.

There are two main types of destinations: queues and topics. All destinations allow multiple actors to place messages with them. The type of destination affects what happens to the message once given to the destination. A queue delivers the message to a single recipient (possibly one of many candidate recipients). A topic delivers the message to multiple interested recipients.

In the image below, the green lines represent the flow of a single message from a producer to one-or-more consumers through a topic and a queue.

Producers

Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination.

Consumers

Any component that waits for messages to be delivered to it by the message broker is consider a consumer. A consumer is unaware of the producer and any other consumers, potentially.

Queues and topics (collectively known as destinations) may be deployed with your application, or separate from your application. Various parts of your application may also implicitly deploy and use some destinations.

Each method has advantages and disadvantages involving the expectations of your application and its interaction with resources outside the scope of the application.

You have several options when deploying queues and topics, based on the lifecycle that suits your systems best.

If your queues and topics have a lifecycle that extends beyond the deployment of your applications, you may want long-lived destinations, which are first-order components, and may be deployed on their own. In this way, many applications can come and go over time, publishing and consuming from the same queues.

When using long-lived destinations, *-knob.yml deployment descriptors are placed directly into the deployments/ directory of TorqueBox AS.

Queues

To deploy queues, a simple YAML file is required to name the queues and provide additional configuration parameters. The file should have the suffix of -knob.yml, such as these-queues-knob.yml or those-queues-knob.yml. The only configuration option available on queues is the durable option.

Durability is enabled by default, and involves writing each message to disk so as to be able to recover in the event of failure or server restart. Disabling durability on queues may result in better performance, but increases the risk of losing messages.


Topics

To deploy topics, a simple YAML file is required to name the topics and provide additional configuration parameters. The file should have the suffix of -knob.yml, such as these-topics-knob.yml or those-topics-knob.yml. Currently, no additional configuration parameters are allowed - topic durability is controlled via options on the attached processors (See Section 8.4.4.3, “Connecting Consumers to Destinations”).


All classes in the TorqueBox::Messaging module reside in the Ruby gem, torquebox-messaging, so to use them in your Rails app, you'll need to configure your app to load the gem.

Rails 2.x

Add this to your config/environment.rb:


Rails 3.x

Add this to your Gemfile:


And to use them in any other JRuby script, it's even simpler. First, ensure that rubygems is loaded, then require the torquebox-messaging feature.


There are two main messaging destination abstractions: TorqueBox::Messaging::Queue and TorqueBox::Messaging::Topic. Each has a publish and a receive method, and each must be constructed with a name and an optional hash of options:


You can also set these options via the connect_options on the destination object.

Though sometimes convenient, these methods are fairly low-level and higher-level abstractions such as Message Processors, and Backgroundable are often better-suited to the task.

It's trivial to publish a message to a JMS Queue or Topic with TorqueBox. And if all of your message consumers are Ruby clients, the contents of the messages can be any serializable Ruby or Java object. You just need to ensure that the type of content you produce resides in the runtime environments of both the producer and the consumer.

To send a message, you will need access to a Topic or Queue instance. The preferred method for accessing the destination instance is to use inject(...) (see Messaging Destinations for more details). If you need to pass options to the instance, or only have access to the destination name at runtime, construct either a Topic or a Queue instance with its name and options. Once you have a destination instance, simply call its publish method. The API's of both Topics and Queues are identical; they each simply represent a destination for your messages.

By default, messages are encoded using Ruby's Marshal serialization mechanism, allowing you to include Ruby objects in your message. If you need to produce messages that will be consumed by non-Ruby or TorqueBox 1.x clients, you can override the encoding mechanism globally or on a per publish basis. See Section 8.4.3, “Message Encodings” for more information.




The publish method takes an optional second argument containing a hash of options:

Table 8.2. Publish options

OptionDefaultDescription
:encoding:marshalSpecifies the serialization encoding to use for the message. TorqueBox provides the following built-in encodings:
  • :marshal - The message is encoded/decoded via Marshal, and is transmitted as a binary message.

  • :marshal_base64 - The message is encoded/decoded via Marshal, and is transmitted as a base64 encoded text message. This was the encoding scheme used in TorqueBox 1.x.

  • :json - The message is encoded/decoded via JSON, and is transmitted as a text message. This encoding is limited, and should only be used for simple messages.

  • :clojure - The message is encoded/decoded via the clj gem, and is transmitted as a text message. This encoding is most convenient for messages that can be represented using standard Clojure data structures.

  • :text - The message isn't encoded/decoded at all, and is passed straight through as a text message. The content of the message must be a string.

See Section 8.4.3, “Message Encodings” for more information.
:priority:normalhigher priority messages will be delivered before lower priority messages within the context of a queue. You can specify the priority as an integer in the range 0..9, or as one of the following convenience symbols (with the corresponding integer priorities in parentheses):
  • :low (1)

  • :normal (4)

  • :high (7)

  • :critical (9)

Higher priority messages will be processed before lower priority ones for a specific message processor.
:ttl The maximum time the message will wait in a destination to be consumed, in milliseconds. If the message isn't consumed within this time it will be delivered to an expiry queue. By default, messages don't have a ttl (and therefore never expire). By default, expired messages end up on the /queue/ExpiryQueue queue. If you want to do something special with those messages, you'll need to add a processor for that queue.
:persistenttrueBy default, queued messages will survive across AS restarts. If you don't want a message to be persistent, set the persistence to false (see Section 8.2.2.2, “Long-lived queues and topics for controlling message durability globally for a queue).
:correlation_idnilThe string value to set for the JMSCorrelationID message header.
:reply_tonilThe javax.jms.Destination value to set for the JMSReplyTo message header.
:typenilThe string value to set for the JMSType message header.
:propertiesnilA hash of string key/value pairs to set as message properties. This can be used as application-specific headers and matched against in the :selector option of the receive method.
:startup_timeout30000The maximum time to wait for the destination to become ready on initial app startup, in milliseconds. On a very slow machine this may need to be increased from the default.

Receiving messages from a JMS Queue or Topic is very similar to publishing messages. To consume a message, simply construct either a Queue or Topic instance with its name, and then call its receive method. The API's of both Topics and Queues are identical.


The receive takes an optional argument containing a hash of options:


TorqueBox provides several different encoding serialization schemes for messaging, and allows you to override the default encoding for all of your messages, or override the encoding used on a per publish basis. Creating and registering your own encoding is trivial if you need an encoding scheme that is not provided out of the box.

TorqueBox provides the following built-in encodings:

You can specify the encoding on a per-publish basis (see Section 8.4.1.1, “Publishing Messages”), or set the default encoding globally (see Section 8.4.3.2, “Overriding The Default Encoding”).

Message consumers may be implemented in Ruby and easily attached to destinations. A Ruby consumer may either interact at the lowest JMS-level, or take advantage of higher-level semantics.

Message consumers may extend TorqueBox::Messaging::MessageProcessor and implement an on_message(body) method which will receive the body of the JMS message.

Example 8.16. MessageProcessor subclass

class MyConsumer < TorqueBox::Messaging::MessageProcessor
  def on_message(body)
    # The body will be of whatever type was published by the Producer
    # the entire JMS message is available as a member variable called message()
  end
  def on_error(exception)
    # You may optionally override this to interrogate the exception. If you do, 
    # you're responsible for re-raising it to force a retry.
  end
end

There is an accessor for the actual JMS message that is set by TorqueBox prior to invoking on_message, so it's there if you need it.


Just like with process!, if on_message raises an exception, the message broker considers the message undelivered. You may trap the error by overriding on_error, at which point you decide whether to re-raise the exception to force a retry. That is the default behavior if you do not override the method.

To connect consumers within a TorqueBox-deployed application, you need to add a messaging: section to your torquebox.yml (or external *-knob.yml descriptor), or add a processor directive to the destination definition if you are using the DSL (in torquebox.rb).

If you are using a YAML descriptor, the messaging: section will contain the mappings from your destinations (topics and queues) to your consumers. The section is a YAML hash, the keys of which are your destination names, which should correspond to existing queues and topics. These destinations may be deployed through the same torquebox.yml or as long-lived destinations.

If you are using a DSL descriptor, the consumers are not defined in a separate section, but as part of the queue/topic definition. If the destination is a long-lived destination (managed by another application), then you will need to tell TorqueBox not to try to create the destination by setting the create to false.


The above example shows the simplest possible configuration, but it's possible to alter the behavior of your message processor using the following options:



The YAML and DSL syntaxes enable the configuration to get fairly sophisticated, allowing you to, for example, map a single destination to multiple processors or re-use configuration options in multiple processors. You may never have a need for this much flexibility, but it's available if you do.

Example 8.19. Advanced messaging configuration in a deployment descriptor

Using the YAML syntax:

application:
  ...

messaging:
  /topics/simple: SimpleHandler

  /topics/popular:
    - Handler
        concurrency: 5
    - Observer: &defaults
        selector: "x > 18"
        config:
          x: ex
          y: why
    - Processor

/queues/students:
    VerySimpleAnalyzer:
    YouthMonitor:
      selector: "y < 18"
      config:
        h: ache
        i: eye
    LookAndFeel:
      <<: *defaults

Here we have /topics/simple mapped to a single processor of type SimpleHandler using a YAML string, /topics/popular mapped to three processors (Handler, Observer, Processor) using a YAML list, and /queues/students mapped to three more processors (VerySimpleAnalyzer, YouthMonitor, LookAndFeel) using a YAML hash where each key in the hash corresponds to the processor type. This example also takes advantage of YAML's ability to merge hash's: the Observer and LookAndFeel processors are configured identically.

And via the DSL:

TorqueBox.configure do
  ...
  topic '/topics/simple' do
    processor SimpleHandler
  end

  common_config = { :selector => "x > 18", :config => { :x => 'ex', :y => 'why' } }

  topic '/topics/popular' do |topic| 
    topic.processor Handler, :concurrency => 5
    topic.processor Observer, common_config
    topic.processor Processor
  end

  queue '/queues/students' do |queue|
    queue.processor VerySimpleAnalyzer
    queue.processor YouthMonitor do 
      selector "y < 18"
      config do
        h 'ache'
        i 'eye'
      end
    end
    queue.processor LookAndFeel, common_config
  end
end

Here we have the same configuration as the YAML example above, but expressed via the DSL. Note that we have to use the block argument form for our destinations that share common_config. This is due to the no-argument form using instance_eval, which does not allow you to access any variables defined outside of the block.


TorqueBox also provides Backgroundable methods. Backgroundable allows you to process any method on any object asynchronously. You can mark a method to always execute in the background, or send a method to the background on an ad hoc basis. Backgrounded methods return a Future object that can be used to monitor the status of the method invocation and retrieve the final return value. When transitioning from TorqueBox 1.x to 2.x, it is advisable to replace any Task implementation with usage of Backgroundable.

The priority, time-to-live (TTL), and persistence options that are available when publishing messages are available to Backgroundable methods as well:



Methods backgrounded via Backgroundable return Future objects that allow you to monitor the progress of the asynchronous processing.

Table 8.6. Future instance methods

MethodDescription
started?Returns true if the task processing has started.
complete?Returns true if the task processing has completed without error. If true, The result is available via the result method.
error?Returns true if an error occurred during the task processing. If true, The actual error is available via the error method.
statusReturns the last status message returned from the task. This will only have meaning if you signal status information from within your task. See the status notifications section for more details.
status_changed?Returns true if the status has changed since you last called status. This will only have meaning if you signal status information from within your task. See the status notifications section for more details.
all_statusesReturns an array of all the statuses received by the future, which may not include all of the statuses sent if the task completes before they are all received. This will only have meaning if you signal status information from within your task. See the status notifications section for more details.
resultReturns the result of the remote processing. This method takes a timeout (in milliseconds), and will block for that amount of time if processing has started but not completed, or up to twice that time if processing has yet to start. If no result is available after timing out, a TorqueBox::Messaging::TimeoutException is raised. The timeout defaults to 30 seconds. The recommended pattern is to wait for complete? to return true before calling result.
method_missingDelegates any missing methods to the result, using the default timeout.
errorReturns the remote error object if an error occurred during task processing.