Class | Beetle::Client |
In: |
lib/beetle/client.rb
|
Parent: | Object |
This class provides the interface through which messaging is configured for both message producers and consumers. It keeps references to an instance of a Beetle::Subscriber, a Beetle::Publisher (both of which are instantiated on demand), and a reference to an instance of Beetle::DeduplicationStore.
Configuration of exchanges, queues, messages, and message handlers is done by calls to corresponding register_ methods. Note that these methods just build up the configuration, they don‘t interact with the AMQP servers.
On the publisher side, publishing a message will ensure that the exchange it will be sent to, and each of the queues bound to the exchange, will be created on demand. On the subscriber side, exchanges, queues, bindings and queue subscriptions will be created when the application calls the listen method. An application can decide to subscribe to only a subset of the configured queues by passing a list of queue names to the listen method.
The net effect of this strategy is that producers and consumers can be started in any order, so that no message is lost if message producers are accidentally started before the corresponding consumers.
bindings | [R] | an options hash for the configured queue bindings |
config | [R] | accessor for the beetle configuration |
deduplication_store | [R] | the deduplication store to use for this client |
exchanges | [R] | an options hash for the configured exchanges |
messages | [R] | an options hash for the configured messages |
queues | [R] | an options hash for the configured queues |
servers | [R] | the AMQP servers available for publishing |
create a fresh Client instance from a given configuration object
# File lib/beetle/client.rb, line 46 46: def initialize(config = Beetle.config) 47: @config = config 48: @servers = config.servers.split(/ *, */) 49: @exchanges = {} 50: @queues = {} 51: @messages = {} 52: @bindings = {} 53: @deduplication_store = DeduplicationStore.new(config) 54: end
this is a convenience method to configure exchanges, queues, messages and handlers with a common set of options. allows one to call all register methods without the register_ prefix. returns self.
Example:
client = Beetle.client.new.configure :exchange => :foobar do |config| config.queue :q1, :key => "foo" config.queue :q2, :key => "bar" config.message :foo config.message :bar config.handler :q1 { puts "got foo"} config.handler :q2 { puts "got bar"} end
# File lib/beetle/client.rb, line 159 159: def configure(options={}) #:yields: config 160: yield Configurator.new(self, options) 161: self 162: end
start listening to a list of messages (default to all registered messages). runs the given block before entering the eventmachine loop.
# File lib/beetle/client.rb, line 189 189: def listen(messages=self.messages.keys, &block) 190: messages = messages.map(&:to_s) 191: messages.each{|m| raise UnknownMessage.new("unknown message #{m}") unless self.messages.include?(m)} 192: subscriber.listen(messages, &block) 193: end
evaluate the ruby files matching the given glob pattern in the context of the client instance.
# File lib/beetle/client.rb, line 221 221: def load(glob) 222: b = binding 223: Dir[glob].each do |f| 224: eval(File.read(f), b, f) 225: end 226: end
publishes a message. the given options hash is merged with options given on message registration.
# File lib/beetle/client.rb, line 165 165: def publish(message_name, data=nil, opts={}) 166: message_name = message_name.to_s 167: raise UnknownMessage.new("unknown message #{message_name}") unless messages.include?(message_name) 168: publisher.publish(message_name, data, opts) 169: end
purges the given queue on all configured servers
# File lib/beetle/client.rb, line 181 181: def purge(queue_name) 182: queue_name = queue_name.to_s 183: raise UnknownQueue.new("unknown queue #{queue_name}") unless queues.include?(queue_name) 184: publisher.purge(queue_name) 185: end
register an additional binding for an already configured queue name and an options hash:
automatically registers the specified exchange if it hasn‘t been registered yet
# File lib/beetle/client.rb, line 93 93: def register_binding(queue_name, options={}) 94: name = queue_name.to_s 95: opts = options.symbolize_keys 96: exchange = (opts[:exchange] || name).to_s 97: key = (opts[:key] || name).to_s 98: (bindings[name] ||= []) << {:exchange => exchange, :key => key} 99: register_exchange(exchange) unless exchanges.include?(exchange) 100: queues = (exchanges[exchange][:queues] ||= []) 101: queues << name unless queues.include?(name) 102: end
register an exchange with the given name and a set of options:
# File lib/beetle/client.rb, line 62 62: def register_exchange(name, options={}) 63: name = name.to_s 64: raise ConfigurationError.new("exchange #{name} already configured") if exchanges.include?(name) 65: exchanges[name] = options.symbolize_keys.merge(:type => :topic, :durable => true) 66: end
registers a handler for a list of queues (which must have been registered previously). The handler will be invoked when any messages arrive on the queue.
Examples:
register_handler([:foo, :bar], :timeout => 10.seconds) { |message| puts "received #{message}" } on_error = lambda{ puts "something went wrong with baz" } on_failure = lambda{ puts "baz has finally failed" } register_handler(:baz, :exceptions => 1, :errback => on_error, :failback => on_failure) { puts "received baz" } register_handler(:bar, BarHandler)
For details on handler classes see class Beetle::Handler
# File lib/beetle/client.rb, line 137 137: def register_handler(queues, *args, &block) 138: queues = Array(queues).map(&:to_s) 139: queues.each {|q| raise UnknownQueue.new(q) unless self.queues.include?(q)} 140: opts = args.last.is_a?(Hash) ? args.pop : {} 141: handler = args.shift 142: raise ArgumentError.new("too many arguments for handler registration") unless args.empty? 143: subscriber.register_handler(queues, opts, handler, &block) 144: end
register a persistent message with a given name and an options hash:
# File lib/beetle/client.rb, line 113 113: def register_message(message_name, options={}) 114: name = message_name.to_s 115: raise ConfigurationError.new("message #{name} already configured") if messages.include?(name) 116: opts = {:exchange => name, :key => name}.merge!(options.symbolize_keys) 117: opts.merge! :persistent => true 118: opts[:exchange] = opts[:exchange].to_s 119: messages[name] = opts 120: end
register a durable, non passive, non auto_deleted queue with the given name and an options hash:
automatically registers the specified exchange if it hasn‘t been registered yet
# File lib/beetle/client.rb, line 75 75: def register_queue(name, options={}) 76: name = name.to_s 77: raise ConfigurationError.new("queue #{name} already configured") if queues.include?(name) 78: opts = {:exchange => name, :key => name, :auto_delete => false, :amqp_name => name}.merge!(options.symbolize_keys) 79: opts.merge! :durable => true, :passive => false, :exclusive => false 80: exchange = opts.delete(:exchange).to_s 81: key = opts.delete(:key) 82: queues[name] = opts 83: register_binding(name, :exchange => exchange, :key => key) 84: end
sends the given message to one of the configured servers and returns the result of running the associated handler.
unexpected behavior can ensue if the message gets routed to more than one recipient, so be careful.
# File lib/beetle/client.rb, line 174 174: def rpc(message_name, data=nil, opts={}) 175: message_name = message_name.to_s 176: raise UnknownMessage.new("unknown message #{message_name}") unless messages.include?(message_name) 177: publisher.rpc(message_name, data, opts) 178: end
stops the eventmachine loop
# File lib/beetle/client.rb, line 196 196: def stop_listening 197: subscriber.stop! 198: end
disconnects the publisher from all servers it‘s currently connected to
# File lib/beetle/client.rb, line 201 201: def stop_publishing 202: publisher.stop 203: end
traces messages without consuming them. useful for debugging message flow.
# File lib/beetle/client.rb, line 206 206: def trace(messages=self.messages.keys, &block) 207: queues.each do |name, opts| 208: opts.merge! :durable => false, :auto_delete => true, :amqp_name => queue_name_for_tracing(opts[:amqp_name]) 209: end 210: register_handler(queues.keys) do |msg| 211: puts "-----===== new message =====-----" 212: puts "SERVER: #{msg.server}" 213: puts "HEADER: #{msg.header.inspect}" 214: puts "MSGID: #{msg.msg_id}" 215: puts "DATA: #{msg.data}" 216: end 217: listen(messages, &block) 218: end