Class: TorqueBox::Messaging::Session

Inherits:
Object
  • Object
show all
Defined in:
messaging/lib/torquebox/messaging/session.rb

Direct Known Subclasses

XaSession

Constant Summary

AUTO_ACK =
javax.jms::Session::AUTO_ACKNOWLEDGE
CLIENT_ACK =
javax.jms::Session::CLIENT_ACKNOWLEDGE
DUPS_OK_ACK =
javax.jms::Session::DUPS_OK_ACKNOWLEDGE
SESSION_TRANSACTED =
javax.jms::Session::SESSION_TRANSACTED

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Session) initialize(jms_session)

Returns a new instance of Session



30
31
32
# File 'messaging/lib/torquebox/messaging/session.rb', line 30

def initialize(jms_session)
  @jms_session = jms_session
end

Instance Attribute Details

- (Object) :jms_session

Returns the value of attribute :jms_session



28
29
30
# File 'messaging/lib/torquebox/messaging/session.rb', line 28

def :jms_session
  @:jms_session
end

Class Method Details

+ (Object) canonical_ack_mode(ack_mode)



158
159
160
161
162
163
164
165
166
167
168
169
# File 'messaging/lib/torquebox/messaging/session.rb', line 158

def self.canonical_ack_mode(ack_mode)
  case ( ack_mode )
    when Fixnum
      return ack_mode
    when :auto
      return AUTO_ACK
    when :client
      return CLIENT_ACK
    when :dups_ok
      return DUPS_OK_ACK
  end
end

Instance Method Details

- (Object) close



34
35
36
# File 'messaging/lib/torquebox/messaging/session.rb', line 34

def close
  @jms_session.close
end

- (Object) create_browser(*args)



148
149
150
# File 'messaging/lib/torquebox/messaging/session.rb', line 148

def create_browser(*args)
  jms_session.create_browser( *args )
end

- (Object) java_destination(destination)



152
153
154
155
156
# File 'messaging/lib/torquebox/messaging/session.rb', line 152

def java_destination(destination)
  return destination.java_destination unless destination.java_destination.nil?
  meth = destination.is_a?( Queue ) ? :create_queue : :create_topic
  @jms_session.send( meth, destination.name )
end

- (Object) publish(destination, payload, options = {})



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'messaging/lib/torquebox/messaging/session.rb', line 42

def publish(destination, payload, options={})
  producer    = @jms_session.create_producer( java_destination( destination ) )
  message     = Message.new( @jms_session, payload, options[:encoding] )

  options[:properties] ||= {}

  # This will let us create messages to be scheduled later like this:
  #
  # queue.publish(:ham => :biscuit, :scheduled => Time.now + 10)
  # queue.publish(:ham => :biscuit, :scheduled => Time.now + 2.5)
  #
  # In Rails it is possible to do:
  #
  # queue.publish(:ham => :biscuit, :scheduled => 3.minutes.from_now)
  #
  # Please note that the :scheduled parameter takes a Time object.
  if options.has_key?(:scheduled)
    options[:properties][Java::org.hornetq.api.core.Message::HDR_SCHEDULED_DELIVERY_TIME.to_s] = (options[:scheduled].to_f * 1000).to_i
  end

  message.populate_message_headers(options)
  message.populate_message_properties(options[:properties])

  producer.send( message.jms_message,
                 options.fetch(:delivery_mode, producer.delivery_mode),
                 options.fetch(:priority, producer.priority),
                 options.fetch(:ttl, producer.time_to_live) )
  message
end

- (Object) publish_and_receive(destination, message, options = {})



111
112
113
114
115
116
117
118
119
# File 'messaging/lib/torquebox/messaging/session.rb', line 111

def publish_and_receive(destination, message, options = {})
  options[:timeout] ||= 10_000 # 10s
  options[:properties] ||= {}
  options[:properties]["synchronous"] = "true"
  message = publish(destination, message, options)
    
  options[:selector] = "JMSCorrelationID='#{message.jms_message.jms_message_id}'"
  receive(destination, options)
end

- (Object) queue_for(name)



38
39
40
# File 'messaging/lib/torquebox/messaging/session.rb', line 38

def queue_for(name)
  Queue.new( @jms_session.create_queue( name ) )
end

- (Object) receive(destination, options = {})



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'messaging/lib/torquebox/messaging/session.rb', line 75

def receive(destination, options={})
  decode = options.fetch(:decode, true)
  timeout = options.fetch(:timeout, 0)
  selector = options[:selector]
  
  java_destination = java_destination( destination )
  if options[:durable] && java_destination.class.name =~ /Topic/
    consumer = @jms_session.createDurableSubscriber( java_destination,
                                                     options.fetch(:subscriber_name, Topic::DEFAULT_SUBSCRIBER_NAME),
                                                     selector,
                                                     false )
  else
    consumer = @jms_session.createConsumer( java_destination, selector )      
  end
  begin
    jms_message = consumer.receive( timeout )
    if jms_message
      message = decode ? Message.new( jms_message ).decode : jms_message
      block_given? ? yield(message) : message
    end
  ensure
    consumer.close unless consumer.nil?
  end
end

- (Object) receive_and_publish(destination, options = {})



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'messaging/lib/torquebox/messaging/session.rb', line 125

def receive_and_publish(destination, options = {})
  selector = "synchronous = 'true'"
  selector = "#{selector} and (#{options[:selector]})" if options[:selector]
  receive_options = options.merge(:decode => false,
                                  :selector => selector)
    
  request = receive(destination, receive_options)
  unless request.nil?
    request_message = Message.new( request ).decode
    options[:ttl] ||= 60_000 # 1m
    options[:encoding] ||= Message.extract_encoding_from_message( request )

    response = block_given? ? yield(request_message) : request_message
    
    options[:correlation_id] = request.jms_message_id
    publish(destination, response, options)
  end
end

- (Object) unsubscribe(subscriber_name = Topic::DEFAULT_SUBSCRIBER_NAME)



144
145
146
# File 'messaging/lib/torquebox/messaging/session.rb', line 144

def unsubscribe(subscriber_name = Topic::DEFAULT_SUBSCRIBER_NAME)
  @jms_session.unsubscribe( subscriber_name )
end