Class Beetle::Client
In: lib/beetle/client.rb
Parent: Object

This class provides the interface through which messaging is configured for both message producers and consumers. It keeps references to an instance of a Beetle::Subscriber, a Beetle::Publisher (both of which are instantiated on demand), and a reference to an instance of Beetle::DeduplicationStore.

Configuration of exchanges, queues, messages, and message handlers is done by calls to corresponding register_ methods. Note that these methods just build up the configuration, they don‘t interact with the AMQP servers.

On the publisher side, publishing a message will ensure that the exchange it will be sent to, and each of the queues bound to the exchange, will be created on demand. On the subscriber side, exchanges, queues, bindings and queue subscriptions will be created when the application calls the listen method. An application can decide to subscribe to only a subset of the configured queues by passing a list of queue names to the listen method.

The net effect of this strategy is that producers and consumers can be started in any order, so that no message is lost if message producers are accidentally started before the corresponding consumers.

Methods

Included Modules

Logging

Attributes

bindings  [R]  an options hash for the configured queue bindings
config  [R]  accessor for the beetle configuration
deduplication_store  [R]  the deduplication store to use for this client
exchanges  [R]  an options hash for the configured exchanges
messages  [R]  an options hash for the configured messages
queues  [R]  an options hash for the configured queues
servers  [R]  the AMQP servers available for publishing

Public Class methods

create a fresh Client instance from a given configuration object

[Source]

    # File lib/beetle/client.rb, line 46
46:     def initialize(config = Beetle.config)
47:       @config  = config
48:       @servers = config.servers.split(/ *, */)
49:       @exchanges = {}
50:       @queues = {}
51:       @messages = {}
52:       @bindings = {}
53:       @deduplication_store = DeduplicationStore.new(config)
54:     end

Public Instance methods

this is a convenience method to configure exchanges, queues, messages and handlers with a common set of options. allows one to call all register methods without the register_ prefix. returns self.

Example:

 client = Beetle.client.new.configure :exchange => :foobar do |config|
   config.queue :q1, :key => "foo"
   config.queue :q2, :key => "bar"
   config.message :foo
   config.message :bar
   config.handler :q1 { puts "got foo"}
   config.handler :q2 { puts "got bar"}
 end

[Source]

     # File lib/beetle/client.rb, line 159
159:     def configure(options={}) #:yields: config
160:       yield Configurator.new(self, options)
161:       self
162:     end

start listening to a list of messages (default to all registered messages). runs the given block before entering the eventmachine loop.

[Source]

     # File lib/beetle/client.rb, line 189
189:     def listen(messages=self.messages.keys, &block)
190:       messages = messages.map(&:to_s)
191:       messages.each{|m| raise UnknownMessage.new("unknown message #{m}") unless self.messages.include?(m)}
192:       subscriber.listen(messages, &block)
193:     end

evaluate the ruby files matching the given glob pattern in the context of the client instance.

[Source]

     # File lib/beetle/client.rb, line 221
221:     def load(glob)
222:       b = binding
223:       Dir[glob].each do |f|
224:         eval(File.read(f), b, f)
225:       end
226:     end

publishes a message. the given options hash is merged with options given on message registration.

[Source]

     # File lib/beetle/client.rb, line 165
165:     def publish(message_name, data=nil, opts={})
166:       message_name = message_name.to_s
167:       raise UnknownMessage.new("unknown message #{message_name}") unless messages.include?(message_name)
168:       publisher.publish(message_name, data, opts)
169:     end

purges the given queue on all configured servers

[Source]

     # File lib/beetle/client.rb, line 181
181:     def purge(queue_name)
182:       queue_name = queue_name.to_s
183:       raise UnknownQueue.new("unknown queue #{queue_name}") unless queues.include?(queue_name)
184:       publisher.purge(queue_name)
185:     end

register an additional binding for an already configured queue name and an options hash:

:exchange
the name of the exchange this queue will be bound to (defaults to the name of the queue)
:key
the binding key (defaults to the name of the queue)

automatically registers the specified exchange if it hasn‘t been registered yet

[Source]

     # File lib/beetle/client.rb, line 93
 93:     def register_binding(queue_name, options={})
 94:       name = queue_name.to_s
 95:       opts = options.symbolize_keys
 96:       exchange = (opts[:exchange] || name).to_s
 97:       key = (opts[:key] || name).to_s
 98:       (bindings[name] ||= []) << {:exchange => exchange, :key => key}
 99:       register_exchange(exchange) unless exchanges.include?(exchange)
100:       queues = (exchanges[exchange][:queues] ||= [])
101:       queues << name unless queues.include?(name)
102:     end

register an exchange with the given name and a set of options:

:type
the type option will be overwritten and always be :topic, beetle does not allow fanout exchanges
:durable
the durable option will be overwritten and always be true. this is done to ensure that exchanges are never deleted

[Source]

    # File lib/beetle/client.rb, line 62
62:     def register_exchange(name, options={})
63:       name = name.to_s
64:       raise ConfigurationError.new("exchange #{name} already configured") if exchanges.include?(name)
65:       exchanges[name] = options.symbolize_keys.merge(:type => :topic, :durable => true)
66:     end

registers a handler for a list of queues (which must have been registered previously). The handler will be invoked when any messages arrive on the queue.

Examples:

  register_handler([:foo, :bar], :timeout => 10.seconds) { |message| puts "received #{message}" }

  on_error   = lambda{ puts "something went wrong with baz" }
  on_failure = lambda{ puts "baz has finally failed" }

  register_handler(:baz, :exceptions => 1, :errback => on_error, :failback => on_failure) { puts "received baz" }

  register_handler(:bar, BarHandler)

For details on handler classes see class Beetle::Handler

[Source]

     # File lib/beetle/client.rb, line 137
137:     def register_handler(queues, *args, &block)
138:       queues = Array(queues).map(&:to_s)
139:       queues.each {|q| raise UnknownQueue.new(q) unless self.queues.include?(q)}
140:       opts = args.last.is_a?(Hash) ? args.pop : {}
141:       handler = args.shift
142:       raise ArgumentError.new("too many arguments for handler registration") unless args.empty?
143:       subscriber.register_handler(queues, opts, handler, &block)
144:     end

register a persistent message with a given name and an options hash:

:key
specifies the routing key for message publishing (defaults to the name of the message)
:ttl
specifies the time interval after which the message will be silently dropped (seconds). defaults to Message::DEFAULT_TTL.
:redundant
specifies whether the message should be published redundantly (defaults to false)

[Source]

     # File lib/beetle/client.rb, line 113
113:     def register_message(message_name, options={})
114:       name = message_name.to_s
115:       raise ConfigurationError.new("message #{name} already configured") if messages.include?(name)
116:       opts = {:exchange => name, :key => name}.merge!(options.symbolize_keys)
117:       opts.merge! :persistent => true
118:       opts[:exchange] = opts[:exchange].to_s
119:       messages[name] = opts
120:     end

register a durable, non passive, non auto_deleted queue with the given name and an options hash:

:exchange
the name of the exchange this queue will be bound to (defaults to the name of the queue)
:key
the binding key (defaults to the name of the queue)

automatically registers the specified exchange if it hasn‘t been registered yet

[Source]

    # File lib/beetle/client.rb, line 75
75:     def register_queue(name, options={})
76:       name = name.to_s
77:       raise ConfigurationError.new("queue #{name} already configured") if queues.include?(name)
78:       opts = {:exchange => name, :key => name, :auto_delete => false, :amqp_name => name}.merge!(options.symbolize_keys)
79:       opts.merge! :durable => true, :passive => false, :exclusive => false
80:       exchange = opts.delete(:exchange).to_s
81:       key = opts.delete(:key)
82:       queues[name] = opts
83:       register_binding(name, :exchange => exchange, :key => key)
84:     end

sends the given message to one of the configured servers and returns the result of running the associated handler.

unexpected behavior can ensue if the message gets routed to more than one recipient, so be careful.

[Source]

     # File lib/beetle/client.rb, line 174
174:     def rpc(message_name, data=nil, opts={})
175:       message_name = message_name.to_s
176:       raise UnknownMessage.new("unknown message #{message_name}") unless messages.include?(message_name)
177:       publisher.rpc(message_name, data, opts)
178:     end

stops the eventmachine loop

[Source]

     # File lib/beetle/client.rb, line 196
196:     def stop_listening
197:       subscriber.stop!
198:     end

disconnects the publisher from all servers it‘s currently connected to

[Source]

     # File lib/beetle/client.rb, line 201
201:     def stop_publishing
202:       publisher.stop
203:     end

traces messages without consuming them. useful for debugging message flow.

[Source]

     # File lib/beetle/client.rb, line 206
206:     def trace(messages=self.messages.keys, &block)
207:       queues.each do |name, opts|
208:         opts.merge! :durable => false, :auto_delete => true, :amqp_name => queue_name_for_tracing(opts[:amqp_name])
209:       end
210:       register_handler(queues.keys) do |msg|
211:         puts "-----===== new message =====-----"
212:         puts "SERVER: #{msg.server}"
213:         puts "HEADER: #{msg.header.inspect}"
214:         puts "MSGID: #{msg.msg_id}"
215:         puts "DATA: #{msg.data}"
216:       end
217:       listen(messages, &block)
218:     end

[Validate]