Class Beetle::Message
In: lib/beetle/message.rb
Parent: Object

Instances of class Message are created when a scubscription callback fires. Class Message contains the code responsible for message deduplication and determining if it should retry executing the message handler after a handler has crashed (or forcefully aborted).

Methods

Included Modules

Logging

Constants

FORMAT_VERSION = 1   current message format version
FLAG_REDUNDANT = 1   flag for encoding redundant messages
DEFAULT_TTL = 1.day   default lifetime of messages
DEFAULT_HANDLER_TIMEOUT = 600.seconds   forcefully abort a running handler after this many seconds. can be overriden when registering a handler.
DEFAULT_HANDLER_EXECUTION_ATTEMPTS = 1   how many times we should try to run a handler before giving up
DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY = 10.seconds   how many seconds we should wait before retrying handler execution
DEFAULT_EXCEPTION_LIMIT = 0   how many exceptions should be tolerated before giving up

Attributes

attempts_limit  [R]  how many times we should try to run the handler
data  [R]  message payload
delay  [R]  how long to wait before retrying the message handler
exception  [R]  exception raised by handler execution
exceptions_limit  [R]  how many exceptions we should tolerate before giving up
expires_at  [R]  unix timestamp after which the message should be considered stale
flags  [R]  flags sent with the message
format_version  [R]  the message format version of the message
handler_result  [R]  value returned by handler execution
header  [R]  the AMQP header received with the message
queue  [R]  name of the queue on which the message was received
server  [R]  server from which the message was received
timeout  [R]  how many seconds the handler is allowed to execute
uuid  [R]  the uuid of the message

Public Class methods

generate uuid for publishing

[Source]

     # File lib/beetle/message.rb, line 122
122:     def self.generate_uuid
123:       UUID4R::uuid(1)
124:     end

[Source]

    # File lib/beetle/message.rb, line 56
56:     def initialize(queue, header, body, opts = {})
57:       @queue  = queue
58:       @header = header
59:       @data   = body
60:       setup(opts)
61:       decode
62:     end

Public Instance methods

aquire execution mutex before we run the handler (and delete it if we can‘t aquire it).

[Source]

     # File lib/beetle/message.rb, line 208
208:     def aquire_mutex!
209:       if mutex = @store.setnx(msg_id, :mutex, now)
210:         logger.debug "Beetle: aquired mutex: #{msg_id}"
211:       else
212:         delete_mutex!
213:       end
214:       mutex
215:     end

how many times we already tried running the handler

[Source]

     # File lib/beetle/message.rb, line 173
173:     def attempts
174:       @store.get(msg_id, :attempts).to_i
175:     end

whether we have already tried running the handler as often as specified when the handler was registered

[Source]

     # File lib/beetle/message.rb, line 183
183:     def attempts_limit_reached?
184:       (limit = @store.get(msg_id, :attempts)) && limit.to_i >= attempts_limit
185:     end

mark message handling complete in the deduplication store

[Source]

     # File lib/beetle/message.rb, line 157
157:     def completed!
158:       @store.set(msg_id, :status, "completed")
159:       timed_out!
160:     end

message handling completed?

[Source]

     # File lib/beetle/message.rb, line 152
152:     def completed?
153:       @store.get(msg_id, :status) == "completed"
154:     end

whether we should wait before running the handler

[Source]

     # File lib/beetle/message.rb, line 163
163:     def delayed?
164:       (t = @store.get(msg_id, :delay)) && t.to_i > now
165:     end

delete execution mutex

[Source]

     # File lib/beetle/message.rb, line 218
218:     def delete_mutex!
219:       @store.del(msg_id, :mutex)
220:       logger.debug "Beetle: deleted mutex: #{msg_id}"
221:     end

whether the number of exceptions has exceeded the limit set when the handler was registered

[Source]

     # File lib/beetle/message.rb, line 193
193:     def exceptions_limit_reached?
194:       @store.get(msg_id, :exceptions).to_i > exceptions_limit
195:     end

a message has expired if the header expiration timestamp is msaller than the current time

[Source]

     # File lib/beetle/message.rb, line 117
117:     def expired?
118:       @expires_at < now
119:     end

increment number of exception occurences in the deduplication store

[Source]

     # File lib/beetle/message.rb, line 188
188:     def increment_exception_count!
189:       @store.incr(msg_id, :exceptions)
190:     end

record the fact that we are trying to run the handler

[Source]

     # File lib/beetle/message.rb, line 178
178:     def increment_execution_attempts!
179:       @store.incr(msg_id, :attempts)
180:     end

have we already seen this message? if not, set the status to "incomplete" and store the message exipration timestamp in the deduplication store.

[Source]

     # File lib/beetle/message.rb, line 199
199:     def key_exists?
200:       old_message = 0 == @store.msetnx(msg_id, :status =>"incomplete", :expires => @expires_at, :timeout => now + timeout)
201:       if old_message
202:         logger.debug "Beetle: received duplicate message: #{msg_id} on queue: #{@queue}"
203:       end
204:       old_message
205:     end

unique message id. used to form various keys in the deduplication store.

[Source]

     # File lib/beetle/message.rb, line 102
102:     def msg_id
103:       @msg_id ||= "msgid:#{queue}:#{uuid}"
104:     end

process this message and do not allow any exception to escape to the caller

[Source]

     # File lib/beetle/message.rb, line 224
224:     def process(handler)
225:       logger.debug "Beetle: processing message #{msg_id}"
226:       result = nil
227:       begin
228:         result = process_internal(handler)
229:         handler.process_exception(@exception) if @exception
230:         handler.process_failure(result) if result.failure?
231:       rescue Exception => e
232:         Beetle::reraise_expectation_errors!
233:         logger.warn "Beetle: exception '#{e}' during processing of message #{msg_id}"
234:         logger.warn "Beetle: backtrace: #{e.backtrace.join("\n")}"
235:         result = RC::InternalError
236:       end
237:       result
238:     end

whether the publisher has tried sending this message to two servers

[Source]

     # File lib/beetle/message.rb, line 127
127:     def redundant?
128:       @flags & FLAG_REDUNDANT == FLAG_REDUNDANT
129:     end

store delay value in the deduplication store

[Source]

     # File lib/beetle/message.rb, line 168
168:     def set_delay!
169:       @store.set(msg_id, :delay, now + delay)
170:     end

store handler timeout timestamp in the deduplication store

[Source]

     # File lib/beetle/message.rb, line 137
137:     def set_timeout!
138:       @store.set(msg_id, :timeout, now + timeout)
139:     end

whether this is a message we can process without accessing the deduplication store

[Source]

     # File lib/beetle/message.rb, line 132
132:     def simple?
133:       !redundant? && attempts_limit == 1
134:     end

reset handler timeout in the deduplication store

[Source]

     # File lib/beetle/message.rb, line 147
147:     def timed_out!
148:       @store.set(msg_id, :timeout, 0)
149:     end

handler timed out?

[Source]

     # File lib/beetle/message.rb, line 142
142:     def timed_out?
143:       (t = @store.get(msg_id, :timeout)) && t.to_i < now
144:     end

Private Instance methods

ack the message for rabbit. deletes all keys associated with this message in the deduplication store if we are sure this is the last message with the given msg_id.

[Source]

     # File lib/beetle/message.rb, line 324
324:     def ack!
325:       #:doc:
326:       logger.debug "Beetle: ack! for message #{msg_id}"
327:       header.ack
328:       return if simple? # simple messages don't use the deduplication store
329:       if !redundant? || @store.incr(msg_id, :ack_count) == 2
330:         @store.del_keys(msg_id)
331:       end
332:     end

[Validate]