| Class | Beetle::Message |
| In: |
lib/beetle/message.rb
|
| Parent: | Object |
| FORMAT_VERSION | = | 1 | current message format version | |
| FLAG_REDUNDANT | = | 1 | flag for encoding redundant messages | |
| DEFAULT_TTL | = | 1.day | default lifetime of messages | |
| DEFAULT_HANDLER_TIMEOUT | = | 600.seconds | forcefully abort a running handler after this many seconds. can be overriden when registering a handler. | |
| DEFAULT_HANDLER_EXECUTION_ATTEMPTS | = | 1 | how many times we should try to run a handler before giving up | |
| DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY | = | 10.seconds | how many seconds we should wait before retrying handler execution | |
| DEFAULT_EXCEPTION_LIMIT | = | 0 | how many exceptions should be tolerated before giving up |
| attempts_limit | [R] | how many times we should try to run the handler |
| data | [R] | message payload |
| delay | [R] | how long to wait before retrying the message handler |
| exception | [R] | exception raised by handler execution |
| exceptions_limit | [R] | how many exceptions we should tolerate before giving up |
| expires_at | [R] | unix timestamp after which the message should be considered stale |
| flags | [R] | flags sent with the message |
| format_version | [R] | the message format version of the message |
| handler_result | [R] | value returned by handler execution |
| header | [R] | the AMQP header received with the message |
| queue | [R] | name of the queue on which the message was received |
| server | [R] | server from which the message was received |
| timeout | [R] | how many seconds the handler is allowed to execute |
| uuid | [R] | the uuid of the message |
generate uuid for publishing
# File lib/beetle/message.rb, line 122
122: def self.generate_uuid
123: UUID4R::uuid(1)
124: end
# File lib/beetle/message.rb, line 56
56: def initialize(queue, header, body, opts = {})
57: @queue = queue
58: @header = header
59: @data = body
60: setup(opts)
61: decode
62: end
aquire execution mutex before we run the handler (and delete it if we can‘t aquire it).
# File lib/beetle/message.rb, line 208
208: def aquire_mutex!
209: if mutex = @store.setnx(msg_id, :mutex, now)
210: logger.debug "Beetle: aquired mutex: #{msg_id}"
211: else
212: delete_mutex!
213: end
214: mutex
215: end
how many times we already tried running the handler
# File lib/beetle/message.rb, line 173
173: def attempts
174: @store.get(msg_id, :attempts).to_i
175: end
whether we have already tried running the handler as often as specified when the handler was registered
# File lib/beetle/message.rb, line 183
183: def attempts_limit_reached?
184: (limit = @store.get(msg_id, :attempts)) && limit.to_i >= attempts_limit
185: end
mark message handling complete in the deduplication store
# File lib/beetle/message.rb, line 157
157: def completed!
158: @store.set(msg_id, :status, "completed")
159: timed_out!
160: end
message handling completed?
# File lib/beetle/message.rb, line 152
152: def completed?
153: @store.get(msg_id, :status) == "completed"
154: end
whether we should wait before running the handler
# File lib/beetle/message.rb, line 163
163: def delayed?
164: (t = @store.get(msg_id, :delay)) && t.to_i > now
165: end
delete execution mutex
# File lib/beetle/message.rb, line 218
218: def delete_mutex!
219: @store.del(msg_id, :mutex)
220: logger.debug "Beetle: deleted mutex: #{msg_id}"
221: end
whether the number of exceptions has exceeded the limit set when the handler was registered
# File lib/beetle/message.rb, line 193
193: def exceptions_limit_reached?
194: @store.get(msg_id, :exceptions).to_i > exceptions_limit
195: end
a message has expired if the header expiration timestamp is msaller than the current time
# File lib/beetle/message.rb, line 117
117: def expired?
118: @expires_at < now
119: end
increment number of exception occurences in the deduplication store
# File lib/beetle/message.rb, line 188
188: def increment_exception_count!
189: @store.incr(msg_id, :exceptions)
190: end
record the fact that we are trying to run the handler
# File lib/beetle/message.rb, line 178
178: def increment_execution_attempts!
179: @store.incr(msg_id, :attempts)
180: end
have we already seen this message? if not, set the status to "incomplete" and store the message exipration timestamp in the deduplication store.
# File lib/beetle/message.rb, line 199
199: def key_exists?
200: old_message = 0 == @store.msetnx(msg_id, :status =>"incomplete", :expires => @expires_at, :timeout => now + timeout)
201: if old_message
202: logger.debug "Beetle: received duplicate message: #{msg_id} on queue: #{@queue}"
203: end
204: old_message
205: end
unique message id. used to form various keys in the deduplication store.
# File lib/beetle/message.rb, line 102
102: def msg_id
103: @msg_id ||= "msgid:#{queue}:#{uuid}"
104: end
process this message and do not allow any exception to escape to the caller
# File lib/beetle/message.rb, line 224
224: def process(handler)
225: logger.debug "Beetle: processing message #{msg_id}"
226: result = nil
227: begin
228: result = process_internal(handler)
229: handler.process_exception(@exception) if @exception
230: handler.process_failure(result) if result.failure?
231: rescue Exception => e
232: Beetle::reraise_expectation_errors!
233: logger.warn "Beetle: exception '#{e}' during processing of message #{msg_id}"
234: logger.warn "Beetle: backtrace: #{e.backtrace.join("\n")}"
235: result = RC::InternalError
236: end
237: result
238: end
whether the publisher has tried sending this message to two servers
# File lib/beetle/message.rb, line 127
127: def redundant?
128: @flags & FLAG_REDUNDANT == FLAG_REDUNDANT
129: end
store delay value in the deduplication store
# File lib/beetle/message.rb, line 168
168: def set_delay!
169: @store.set(msg_id, :delay, now + delay)
170: end
store handler timeout timestamp in the deduplication store
# File lib/beetle/message.rb, line 137
137: def set_timeout!
138: @store.set(msg_id, :timeout, now + timeout)
139: end
reset handler timeout in the deduplication store
# File lib/beetle/message.rb, line 147
147: def timed_out!
148: @store.set(msg_id, :timeout, 0)
149: end
handler timed out?
# File lib/beetle/message.rb, line 142
142: def timed_out?
143: (t = @store.get(msg_id, :timeout)) && t.to_i < now
144: end
ack the message for rabbit. deletes all keys associated with this message in the deduplication store if we are sure this is the last message with the given msg_id.
# File lib/beetle/message.rb, line 324
324: def ack!
325: #:doc:
326: logger.debug "Beetle: ack! for message #{msg_id}"
327: header.ack
328: return if simple? # simple messages don't use the deduplication store
329: if !redundant? || @store.incr(msg_id, :ack_count) == 2
330: @store.del_keys(msg_id)
331: end
332: end