Class | Beetle::Message |
In: |
lib/beetle/message.rb
|
Parent: | Object |
FORMAT_VERSION | = | 1 | current message format version | |
FLAG_REDUNDANT | = | 1 | flag for encoding redundant messages | |
DEFAULT_TTL | = | 1.day | default lifetime of messages | |
DEFAULT_HANDLER_TIMEOUT | = | 600.seconds | forcefully abort a running handler after this many seconds. can be overriden when registering a handler. | |
DEFAULT_HANDLER_EXECUTION_ATTEMPTS | = | 1 | how many times we should try to run a handler before giving up | |
DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY | = | 10.seconds | how many seconds we should wait before retrying handler execution | |
DEFAULT_EXCEPTION_LIMIT | = | 0 | how many exceptions should be tolerated before giving up |
attempts_limit | [R] | how many times we should try to run the handler |
data | [R] | message payload |
delay | [R] | how long to wait before retrying the message handler |
exception | [R] | exception raised by handler execution |
exceptions_limit | [R] | how many exceptions we should tolerate before giving up |
expires_at | [R] | unix timestamp after which the message should be considered stale |
flags | [R] | flags sent with the message |
format_version | [R] | the message format version of the message |
handler_result | [R] | value returned by handler execution |
header | [R] | the AMQP header received with the message |
queue | [R] | name of the queue on which the message was received |
server | [R] | server from which the message was received |
timeout | [R] | how many seconds the handler is allowed to execute |
uuid | [R] | the uuid of the message |
generate uuid for publishing
# File lib/beetle/message.rb, line 122 122: def self.generate_uuid 123: UUID4R::uuid(1) 124: end
# File lib/beetle/message.rb, line 56 56: def initialize(queue, header, body, opts = {}) 57: @queue = queue 58: @header = header 59: @data = body 60: setup(opts) 61: decode 62: end
aquire execution mutex before we run the handler (and delete it if we can‘t aquire it).
# File lib/beetle/message.rb, line 208 208: def aquire_mutex! 209: if mutex = @store.setnx(msg_id, :mutex, now) 210: logger.debug "Beetle: aquired mutex: #{msg_id}" 211: else 212: delete_mutex! 213: end 214: mutex 215: end
how many times we already tried running the handler
# File lib/beetle/message.rb, line 173 173: def attempts 174: @store.get(msg_id, :attempts).to_i 175: end
whether we have already tried running the handler as often as specified when the handler was registered
# File lib/beetle/message.rb, line 183 183: def attempts_limit_reached? 184: (limit = @store.get(msg_id, :attempts)) && limit.to_i >= attempts_limit 185: end
mark message handling complete in the deduplication store
# File lib/beetle/message.rb, line 157 157: def completed! 158: @store.set(msg_id, :status, "completed") 159: timed_out! 160: end
message handling completed?
# File lib/beetle/message.rb, line 152 152: def completed? 153: @store.get(msg_id, :status) == "completed" 154: end
whether we should wait before running the handler
# File lib/beetle/message.rb, line 163 163: def delayed? 164: (t = @store.get(msg_id, :delay)) && t.to_i > now 165: end
delete execution mutex
# File lib/beetle/message.rb, line 218 218: def delete_mutex! 219: @store.del(msg_id, :mutex) 220: logger.debug "Beetle: deleted mutex: #{msg_id}" 221: end
whether the number of exceptions has exceeded the limit set when the handler was registered
# File lib/beetle/message.rb, line 193 193: def exceptions_limit_reached? 194: @store.get(msg_id, :exceptions).to_i > exceptions_limit 195: end
a message has expired if the header expiration timestamp is msaller than the current time
# File lib/beetle/message.rb, line 117 117: def expired? 118: @expires_at < now 119: end
increment number of exception occurences in the deduplication store
# File lib/beetle/message.rb, line 188 188: def increment_exception_count! 189: @store.incr(msg_id, :exceptions) 190: end
record the fact that we are trying to run the handler
# File lib/beetle/message.rb, line 178 178: def increment_execution_attempts! 179: @store.incr(msg_id, :attempts) 180: end
have we already seen this message? if not, set the status to "incomplete" and store the message exipration timestamp in the deduplication store.
# File lib/beetle/message.rb, line 199 199: def key_exists? 200: old_message = 0 == @store.msetnx(msg_id, :status =>"incomplete", :expires => @expires_at, :timeout => now + timeout) 201: if old_message 202: logger.debug "Beetle: received duplicate message: #{msg_id} on queue: #{@queue}" 203: end 204: old_message 205: end
unique message id. used to form various keys in the deduplication store.
# File lib/beetle/message.rb, line 102 102: def msg_id 103: @msg_id ||= "msgid:#{queue}:#{uuid}" 104: end
process this message and do not allow any exception to escape to the caller
# File lib/beetle/message.rb, line 224 224: def process(handler) 225: logger.debug "Beetle: processing message #{msg_id}" 226: result = nil 227: begin 228: result = process_internal(handler) 229: handler.process_exception(@exception) if @exception 230: handler.process_failure(result) if result.failure? 231: rescue Exception => e 232: Beetle::reraise_expectation_errors! 233: logger.warn "Beetle: exception '#{e}' during processing of message #{msg_id}" 234: logger.warn "Beetle: backtrace: #{e.backtrace.join("\n")}" 235: result = RC::InternalError 236: end 237: result 238: end
whether the publisher has tried sending this message to two servers
# File lib/beetle/message.rb, line 127 127: def redundant? 128: @flags & FLAG_REDUNDANT == FLAG_REDUNDANT 129: end
store delay value in the deduplication store
# File lib/beetle/message.rb, line 168 168: def set_delay! 169: @store.set(msg_id, :delay, now + delay) 170: end
store handler timeout timestamp in the deduplication store
# File lib/beetle/message.rb, line 137 137: def set_timeout! 138: @store.set(msg_id, :timeout, now + timeout) 139: end
reset handler timeout in the deduplication store
# File lib/beetle/message.rb, line 147 147: def timed_out! 148: @store.set(msg_id, :timeout, 0) 149: end
handler timed out?
# File lib/beetle/message.rb, line 142 142: def timed_out? 143: (t = @store.get(msg_id, :timeout)) && t.to_i < now 144: end
ack the message for rabbit. deletes all keys associated with this message in the deduplication store if we are sure this is the last message with the given msg_id.
# File lib/beetle/message.rb, line 324 324: def ack! 325: #:doc: 326: logger.debug "Beetle: ack! for message #{msg_id}" 327: header.ack 328: return if simple? # simple messages don't use the deduplication store 329: if !redundant? || @store.incr(msg_id, :ack_count) == 2 330: @store.del_keys(msg_id) 331: end 332: end