Class: TorqueBox::Stomp::JmsStomplet

Inherits:
Object
  • Object
show all
Defined in:
stomp/lib/torquebox/stomp/jms_stomplet.rb

Defined Under Namespace

Classes: MessageListener

Instance Method Summary (collapse)

Constructor Details

- (JmsStomplet) initialize

Returns a new instance of JmsStomplet



22
23
24
25
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 22

def initialize()
  @connection_factory = TorqueBox.fetch( 'xa-connection-factory' )
  @subscriptions = {}
end

Instance Method Details

- (Object) configure(stomplet_config)



31
32
33
34
35
36
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 31

def configure(stomplet_config)
  @connection = @connection_factory.create_xa_connection
  @connection.start
  @session = @connection.create_session( false )
  @xa_resources = [ @session.xa_resource ]
end

- (Object) destination_for(name, type)



60
61
62
63
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 60

def destination_for(name, type)
  return queue(name) if ( type.to_sym == :queue )
  topic(name)
end

- (Object) destroy



38
39
40
41
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 38

def destroy
  @session.close
  @connection.close
end

- (Object) on_unsubscribe(subscriber)



43
44
45
46
47
48
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 43

def on_unsubscribe(subscriber)
  subscriptions = @subscriptions.delete( subscriber )
  (subscriptions || []).each do |subscription|
    subscription.close
  end
end

- (Object) queue(name)



50
51
52
53
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 50

def queue(name)
  jms_session = @session.jms_session
  TorqueBox::Messaging::Queue.new( jms_session.create_queue( name ) )
end

- (Object) send_to(destination, stomp_message, headers = {}, options = {})



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 74

def send_to(destination, stomp_message, headers={}, options={})
  jms_session = @session.jms_session
  java_destination = @session.java_destination( destination )

  producer    = @session.jms_session.create_producer( java_destination.to_java )

  case ( stomp_message )
  when org.projectodd.stilts.stomp::StompMessage
    content = stomp_message.content_as_string
  else
    content = stomp_message
  end

  encoded_message = TorqueBox::Messaging::Message.new( @session.jms_session, content, options[:encoding] )
  jms_message = encoded_message.jms_message

  if ( stomp_message.is_a?( org.projectodd.stilts.stomp::StompMessage ) )
    stomp_message.headers.header_names.each do |name|
      jms_name = name.to_s.gsub( /-/, '_' )
      header_value = stomp_message.headers[ name.to_s ]
      jms_message.setStringProperty( jms_name, header_value )
    end
  end

  headers.each do |name, header_value|
    jms_name = name.to_s.gsub( /-/, '_' )
    jms_message.setStringProperty( jms_name, header_value.to_s )
  end

  producer.send( jms_message )
end

- (Object) subscribe_to(subscriber, destination, selector = nil, options = {})



65
66
67
68
69
70
71
72
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 65

def subscribe_to(subscriber, destination, selector=nil, options={})
  jms_session = @session.jms_session
  java_destination = @session.java_destination( destination )
  consumer = @session.jms_session.create_consumer( java_destination.to_java, selector )
  consumer.message_listener = MessageListener.new( subscriber, options )
  @subscriptions[ subscriber ] ||= []
  @subscriptions[ subscriber ] << consumer
end

- (Object) topic(name)



55
56
57
58
# File 'stomp/lib/torquebox/stomp/jms_stomplet.rb', line 55

def topic(name)
  jms_session = @session.jms_session
  TorqueBox::Messaging::Topic.new( jms_session.create_topic( name ) )
end