Class: TorqueBox::Messaging::Destination

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
messaging/lib/torquebox/messaging/destination.rb

Direct Known Subclasses

Queue, Topic

Constant Summary

PRIORITY_MAP =
{
  :low => 1,
  :normal => 4,
  :high => 7,
  :critical => 9
}

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Destination) initialize(destination, connection_factory_or_options = nil)

Returns a new instance of Destination

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'messaging/lib/torquebox/messaging/destination.rb', line 50

def initialize(destination, connection_factory_or_options = nil)
  raise ArgumentError, "destination cannot be nil" unless destination
  if connection_factory_or_options.nil? || connection_factory_or_options.is_a?( Hash )
    options = connection_factory_or_options
    connection_factory = TorqueBox.fetch( 'connection-factory' )
    unless options.nil?
      # Don't use our internal connection factory if the user
      # has specified a host or port to connect to
      connection_factory = nil if options[:host] or options[:port]
    end
    @connection_factory = ConnectionFactory.new( connection_factory )
    @connect_options = options || {}
  else
    @connection_factory  = ConnectionFactory.new( connection_factory_or_options )
    @connect_options = {}
  end


  if destination.is_a?(javax.jms.Destination )
    if destination.is_a?(javax.jms.Queue)
      @name = destination.queue_name
    else
      @name = destination.topic_name
    end

    @java_destination = destination
  else
    @name = destination
  end


  @enumerable_options  = {}
end

Instance Attribute Details

- (Object) :connect_options

Returns the value of attribute :connect_options



33
34
35
# File 'messaging/lib/torquebox/messaging/destination.rb', line 33

def :connect_options
  @:connect_options
end

- (Object) :connection_factory (readonly)

Returns the value of attribute :connection_factory



29
30
31
# File 'messaging/lib/torquebox/messaging/destination.rb', line 29

def :connection_factory
  @:connection_factory
end

- (Object) :enumerable_options

Returns the value of attribute :enumerable_options



32
33
34
# File 'messaging/lib/torquebox/messaging/destination.rb', line 32

def :enumerable_options
  @:enumerable_options
end

- (Object) :java_destination (readonly)

Returns the value of attribute :java_destination



31
32
33
# File 'messaging/lib/torquebox/messaging/destination.rb', line 31

def :java_destination
  @:java_destination
end

- (Object) :name (readonly)

Returns the value of attribute :name



30
31
32
# File 'messaging/lib/torquebox/messaging/destination.rb', line 30

def :name
  @:name
end

Class Method Details

+ (Object) _load(str)



46
47
48
# File 'messaging/lib/torquebox/messaging/destination.rb', line 46

def self._load(str)
  self.new( str )
end

+ (Object) list



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'messaging/lib/torquebox/messaging/destination.rb', line 212

def list
  # Get the JMS Manager
  TorqueBox::ServiceRegistry.lookup("jboss.messaging.default.jms.manager") do |manager|

    # JMSServerControl will let us grab the deployed queue/topic list
    server_control = Java::org.hornetq.jms.management.impl.JMSServerControlImpl.new(manager)

    # Retrieve the destination list appropriate to the destination type
    if self == TorqueBox::Messaging::Topic
      names = server_control.topic_names
    elsif self == TorqueBox::Messaging::Queue
      names = server_control.queue_names
    else
      names = []
    end

    names.map { |name| self.new(name) }
  end
end

+ (Object) lookup(name)



239
240
241
# File 'messaging/lib/torquebox/messaging/destination.rb', line 239

def lookup(name)
  list.find { |destination| destination.name == name }
end

+ (Object) wait_for_latch(latch)



253
254
255
256
257
258
259
260
261
262
263
264
# File 'messaging/lib/torquebox/messaging/destination.rb', line 253

def wait_for_latch(latch)
  if latch
    begin
      # Wait for the services to come up for up to 30 seconds
      latch.await(45, java.util.concurrent.TimeUnit::SECONDS)
    rescue
      return false
    end
  end
  
  true
end

+ (Object) with_destinationizer



244
245
246
247
248
249
250
# File 'messaging/lib/torquebox/messaging/destination.rb', line 244

def with_destinationizer
  service_name = TorqueBox::MSC.deployment_unit.service_name.append('torquebox').append('messaging').append('destinationizer')

  TorqueBox::ServiceRegistry.lookup(service_name) do |destinationizer|
    yield destinationizer
  end
end

Instance Method Details

- (Object) _dump(level)



42
43
44
# File 'messaging/lib/torquebox/messaging/destination.rb', line 42

def _dump(level)
  name
end

- (Object) each(&block)



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'messaging/lib/torquebox/messaging/destination.rb', line 132

def each(&block)
  wait_for_destination do
    with_session do |session|
      destination = session.java_destination( self )
      browser = session.create_browser( destination, enumerable_options[:selector] )
      begin
        browser.each(&block)
      ensure
        browser.close
      end
    end
  end
end

- (Object) enumerable_options



197
198
199
200
# File 'messaging/lib/torquebox/messaging/destination.rb', line 197

def enumerable_options
  @enumerable_options ||= { }
  @enumerable_options
end

- (Object) normalize_options(options)



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'messaging/lib/torquebox/messaging/destination.rb', line 178

def normalize_options(options)
  if options.has_key?(:persistent)
    options[:delivery_mode] =
      options.delete(:persistent) ? javax.jms::DeliveryMode.PERSISTENT : javax.jms::DeliveryMode.NON_PERSISTENT
  end

  if options.has_key?(:priority)
    if PRIORITY_MAP[options[:priority]]
      options[:priority] = PRIORITY_MAP[options[:priority]]
    elsif (0..9) === options[:priority].to_i
      options[:priority] = options[:priority].to_i
    else
      raise ArgumentError.new(":priority must in the range 0..9, or one of #{PRIORITY_MAP.keys.collect {|k| ":#{k}"}.join(',')}")
    end
  end

  options
end

- (Object) publish(message, options = {})



109
110
111
112
113
114
115
# File 'messaging/lib/torquebox/messaging/destination.rb', line 109

def publish(message, options = {})
  wait_for_destination(options[:startup_timeout]) do
    with_session(options) do |session|
      session.publish self, message, normalize_options(options)
    end
  end
end

- (Object) receive(options = {}, &block)



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'messaging/lib/torquebox/messaging/destination.rb', line 117

def receive(options = {}, &block)
  wait_for_destination(options[:startup_timeout]) do
    func = lambda do
      with_session(options) do |session|
        session.receive self, options, &block
      end
    end
    if block
      TorqueBox.transaction &func
    else
      func.call
    end
  end
end

- (Object) stop



89
90
91
92
93
# File 'messaging/lib/torquebox/messaging/destination.rb', line 89

def stop
  TorqueBox::Messaging::Destination.with_destinationizer do |destinationizer|
    destinationizer.remove_destination(name)
  end
end

- (Object) stop_sync



100
101
102
# File 'messaging/lib/torquebox/messaging/destination.rb', line 100

def stop_sync
  TorqueBox::Messaging::Destination.wait_for_latch(stop)
end

- (Object) to_s



202
203
204
# File 'messaging/lib/torquebox/messaging/destination.rb', line 202

def to_s
  name
end

- (Object) wait_for_destination(timeout = nil, &block)



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'messaging/lib/torquebox/messaging/destination.rb', line 162

def wait_for_destination(timeout=nil, &block)
  timeout ||= 30_000 # 30s default
  start = Time.now
  begin
    block.call
  rescue javax.naming.NameNotFoundException, javax.jms.JMSException
    elapsed = (Time.now - start) * 1000
    if elapsed > timeout
      raise
    else
      sleep(0.1)
      retry
    end
  end
end

- (Object) with_session(opts = {})



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'messaging/lib/torquebox/messaging/destination.rb', line 146

def with_session(opts = {})
  transactional = opts.fetch(:tx, true)

  # https://issues.jboss.org/browse/TORQUE-1033
  # If there is no encoding for the message, set the default one
  # for the destination. If the encoding for destination isn't set
  # it won't hurt
  opts[:encoding] = @connect_options[:encoding] if opts[:encoding].nil?

  connection_factory.with_new_connection(connect_options, transactional) do |connection|
    connection.with_session do |session|
      yield session
    end
  end
end