| Class | Beetle::Client |
| In: |
lib/beetle/client.rb
|
| Parent: | Object |
This class provides the interface through which messaging is configured for both message producers and consumers. It keeps references to an instance of a Beetle::Subscriber, a Beetle::Publisher (both of which are instantiated on demand), and a reference to an instance of Beetle::DeduplicationStore.
Configuration of exchanges, queues, messages, and message handlers is done by calls to corresponding register_ methods. Note that these methods just build up the configuration, they don‘t interact with the AMQP servers.
On the publisher side, publishing a message will ensure that the exchange it will be sent to, and each of the queues bound to the exchange, will be created on demand. On the subscriber side, exchanges, queues, bindings and queue subscriptions will be created when the application calls the listen method. An application can decide to subscribe to only a subset of the configured queues by passing a list of queue names to the listen method.
The net effect of this strategy is that producers and consumers can be started in any order, so that no message is lost if message producers are accidentally started before the corresponding consumers.
| bindings | [R] | an options hash for the configured queue bindings |
| config | [R] | accessor for the beetle configuration |
| deduplication_store | [R] | the deduplication store to use for this client |
| exchanges | [R] | an options hash for the configured exchanges |
| messages | [R] | an options hash for the configured messages |
| queues | [R] | an options hash for the configured queues |
| servers | [R] | the AMQP servers available for publishing |
create a fresh Client instance from a given configuration object
# File lib/beetle/client.rb, line 46
46: def initialize(config = Beetle.config)
47: @config = config
48: @servers = config.servers.split(/ *, */)
49: @exchanges = {}
50: @queues = {}
51: @messages = {}
52: @bindings = {}
53: @deduplication_store = DeduplicationStore.new(config)
54: end
this is a convenience method to configure exchanges, queues, messages and handlers with a common set of options. allows one to call all register methods without the register_ prefix. returns self.
Example:
client = Beetle.client.new.configure :exchange => :foobar do |config|
config.queue :q1, :key => "foo"
config.queue :q2, :key => "bar"
config.message :foo
config.message :bar
config.handler :q1 { puts "got foo"}
config.handler :q2 { puts "got bar"}
end
# File lib/beetle/client.rb, line 159
159: def configure(options={}) #:yields: config
160: yield Configurator.new(self, options)
161: self
162: end
start listening to a list of messages (default to all registered messages). runs the given block before entering the eventmachine loop.
# File lib/beetle/client.rb, line 189
189: def listen(messages=self.messages.keys, &block)
190: messages = messages.map(&:to_s)
191: messages.each{|m| raise UnknownMessage.new("unknown message #{m}") unless self.messages.include?(m)}
192: subscriber.listen(messages, &block)
193: end
evaluate the ruby files matching the given glob pattern in the context of the client instance.
# File lib/beetle/client.rb, line 221
221: def load(glob)
222: b = binding
223: Dir[glob].each do |f|
224: eval(File.read(f), b, f)
225: end
226: end
publishes a message. the given options hash is merged with options given on message registration.
# File lib/beetle/client.rb, line 165
165: def publish(message_name, data=nil, opts={})
166: message_name = message_name.to_s
167: raise UnknownMessage.new("unknown message #{message_name}") unless messages.include?(message_name)
168: publisher.publish(message_name, data, opts)
169: end
purges the given queue on all configured servers
# File lib/beetle/client.rb, line 181
181: def purge(queue_name)
182: queue_name = queue_name.to_s
183: raise UnknownQueue.new("unknown queue #{queue_name}") unless queues.include?(queue_name)
184: publisher.purge(queue_name)
185: end
register an additional binding for an already configured queue name and an options hash:
automatically registers the specified exchange if it hasn‘t been registered yet
# File lib/beetle/client.rb, line 93
93: def register_binding(queue_name, options={})
94: name = queue_name.to_s
95: opts = options.symbolize_keys
96: exchange = (opts[:exchange] || name).to_s
97: key = (opts[:key] || name).to_s
98: (bindings[name] ||= []) << {:exchange => exchange, :key => key}
99: register_exchange(exchange) unless exchanges.include?(exchange)
100: queues = (exchanges[exchange][:queues] ||= [])
101: queues << name unless queues.include?(name)
102: end
register an exchange with the given name and a set of options:
# File lib/beetle/client.rb, line 62
62: def register_exchange(name, options={})
63: name = name.to_s
64: raise ConfigurationError.new("exchange #{name} already configured") if exchanges.include?(name)
65: exchanges[name] = options.symbolize_keys.merge(:type => :topic, :durable => true)
66: end
registers a handler for a list of queues (which must have been registered previously). The handler will be invoked when any messages arrive on the queue.
Examples:
register_handler([:foo, :bar], :timeout => 10.seconds) { |message| puts "received #{message}" }
on_error = lambda{ puts "something went wrong with baz" }
on_failure = lambda{ puts "baz has finally failed" }
register_handler(:baz, :exceptions => 1, :errback => on_error, :failback => on_failure) { puts "received baz" }
register_handler(:bar, BarHandler)
For details on handler classes see class Beetle::Handler
# File lib/beetle/client.rb, line 137
137: def register_handler(queues, *args, &block)
138: queues = Array(queues).map(&:to_s)
139: queues.each {|q| raise UnknownQueue.new(q) unless self.queues.include?(q)}
140: opts = args.last.is_a?(Hash) ? args.pop : {}
141: handler = args.shift
142: raise ArgumentError.new("too many arguments for handler registration") unless args.empty?
143: subscriber.register_handler(queues, opts, handler, &block)
144: end
register a persistent message with a given name and an options hash:
# File lib/beetle/client.rb, line 113
113: def register_message(message_name, options={})
114: name = message_name.to_s
115: raise ConfigurationError.new("message #{name} already configured") if messages.include?(name)
116: opts = {:exchange => name, :key => name}.merge!(options.symbolize_keys)
117: opts.merge! :persistent => true
118: opts[:exchange] = opts[:exchange].to_s
119: messages[name] = opts
120: end
register a durable, non passive, non auto_deleted queue with the given name and an options hash:
automatically registers the specified exchange if it hasn‘t been registered yet
# File lib/beetle/client.rb, line 75
75: def register_queue(name, options={})
76: name = name.to_s
77: raise ConfigurationError.new("queue #{name} already configured") if queues.include?(name)
78: opts = {:exchange => name, :key => name, :auto_delete => false, :amqp_name => name}.merge!(options.symbolize_keys)
79: opts.merge! :durable => true, :passive => false, :exclusive => false
80: exchange = opts.delete(:exchange).to_s
81: key = opts.delete(:key)
82: queues[name] = opts
83: register_binding(name, :exchange => exchange, :key => key)
84: end
sends the given message to one of the configured servers and returns the result of running the associated handler.
unexpected behavior can ensue if the message gets routed to more than one recipient, so be careful.
# File lib/beetle/client.rb, line 174
174: def rpc(message_name, data=nil, opts={})
175: message_name = message_name.to_s
176: raise UnknownMessage.new("unknown message #{message_name}") unless messages.include?(message_name)
177: publisher.rpc(message_name, data, opts)
178: end
stops the eventmachine loop
# File lib/beetle/client.rb, line 196
196: def stop_listening
197: subscriber.stop!
198: end
disconnects the publisher from all servers it‘s currently connected to
# File lib/beetle/client.rb, line 201
201: def stop_publishing
202: publisher.stop
203: end
traces messages without consuming them. useful for debugging message flow.
# File lib/beetle/client.rb, line 206
206: def trace(messages=self.messages.keys, &block)
207: queues.each do |name, opts|
208: opts.merge! :durable => false, :auto_delete => true, :amqp_name => queue_name_for_tracing(opts[:amqp_name])
209: end
210: register_handler(queues.keys) do |msg|
211: puts "-----===== new message =====-----"
212: puts "SERVER: #{msg.server}"
213: puts "HEADER: #{msg.header.inspect}"
214: puts "MSGID: #{msg.msg_id}"
215: puts "DATA: #{msg.data}"
216: end
217: listen(messages, &block)
218: end