Class: TorqueBox::Messaging::Task

Inherits:
Object
  • Object
show all
Includes:
FutureStatus, ProcessorMiddleware::DefaultMiddleware
Defined in:
messaging/lib/torquebox/messaging/task.rb

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from ProcessorMiddleware::DefaultMiddleware

default, #middleware

Methods included from FutureStatus

#__future

Class Method Details

+ (Object) async(method, payload = {}, options = {})



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'messaging/lib/torquebox/messaging/task.rb', line 37

def self.async(method, payload = {}, options = {})
  queue = Queue.new( queue_name )
  future = Future.new( queue )
  message = {
    :method => method,
    :payload => payload,
    :future_id => future.correlation_id,
    :future_queue => queue_name,
    :future_ttl => options[:future_ttl]
  }
  options[:encoding] = :marshal
  queue.publish( message, options )

  future
rescue javax.naming.NameNotFoundException => ex
  raise RuntimeError.new("The queue for #{self.name} is not available. Did you disable it by setting its concurrency to 0?")
end

+ (Object) queue_name(name = self.name[0...-4])



32
33
34
35
# File 'messaging/lib/torquebox/messaging/task.rb', line 32

def self.queue_name( name = self.name[0...-4] )
  suffix = org.torquebox.core.util.StringUtils.underscore(name)
  "/queues/torquebox/#{ENV['TORQUEBOX_APP_NAME']}/tasks/#{suffix}"
end

Instance Method Details

- (Object) process!(message)



55
56
57
58
59
60
# File 'messaging/lib/torquebox/messaging/task.rb', line 55

def process!(message)
  hash = message.decode
  FutureResponder.new( Queue.new( hash[:future_queue] ), hash[:future_id], hash[:future_ttl] ).respond do
    self.send hash[:method].to_sym, hash[:payload]
  end
end