Class Beetle::DeduplicationStore
In: lib/beetle/deduplication_store.rb
Parent: Object

The deduplication store is used internally by Beetle::Client to store information on the status of message processing. This includes:

  • how often a message has already been seen by some consumer
  • whether a message has been processed successfully
  • how many attempts have been made to execute a message handler for a given message
  • how long we should wait before trying to execute the message handler after a failure
  • how many exceptions have been raised during previous execution attempts
  • how long we should wait before trying to perform the next execution attempt
  • whether some other process is already trying to execute the message handler

It also provides a method to garbage collect keys for expired messages.

Methods

Included Modules

Logging

Constants

KEY_SUFFIXES = [:status, :ack_count, :timeout, :delay, :attempts, :exceptions, :mutex, :expires]   list of key suffixes to use for storing values in Redis.

Public Class methods

[Source]

    # File lib/beetle/deduplication_store.rb, line 16
16:     def initialize(config = Beetle.config)
17:       @config = config
18:       @current_master = nil
19:       @last_time_master_file_changed = nil
20:     end

Public Instance methods

delete key with given suffix for given msg_id.

[Source]

    # File lib/beetle/deduplication_store.rb, line 92
92:     def del(msg_id, suffix)
93:       with_failover { redis.del(key(msg_id, suffix)) }
94:     end

delete all keys associated with the given msg_id.

[Source]

    # File lib/beetle/deduplication_store.rb, line 97
97:     def del_keys(msg_id)
98:       with_failover { redis.del(*keys(msg_id)) }
99:     end

check whether key with given suffix exists for a given msg_id.

[Source]

     # File lib/beetle/deduplication_store.rb, line 102
102:     def exists(msg_id, suffix)
103:       with_failover { redis.exists(key(msg_id, suffix)) }
104:     end

flush the configured redis database. useful for testing.

[Source]

     # File lib/beetle/deduplication_store.rb, line 107
107:     def flushdb
108:       with_failover { redis.flushdb }
109:     end

garbage collect keys in Redis (always assume the worst!)

[Source]

    # File lib/beetle/deduplication_store.rb, line 53
53:     def garbage_collect_keys(now = Time.now.to_i)
54:       keys = redis.keys("msgid:*:expires")
55:       threshold = now + @config.gc_threshold
56:       keys.each do |key|
57:         expires_at = redis.get key
58:         if expires_at && expires_at.to_i < threshold
59:           msg_id = msg_id(key)
60:           redis.del(keys(msg_id))
61:         end
62:       end
63:     end

retrieve the value with given suffix for given msg_id. returns a string.

[Source]

    # File lib/beetle/deduplication_store.rb, line 87
87:     def get(msg_id, suffix)
88:       with_failover { redis.get(key(msg_id, suffix)) }
89:     end

increment counter for key with given suffix for given msg_id. returns an integer.

[Source]

    # File lib/beetle/deduplication_store.rb, line 82
82:     def incr(msg_id, suffix)
83:       with_failover { redis.incr(key(msg_id, suffix)) }
84:     end

build a Redis key out of a message id and a given suffix

[Source]

    # File lib/beetle/deduplication_store.rb, line 38
38:     def key(msg_id, suffix)
39:       "#{msg_id}:#{suffix}"
40:     end

list of keys which potentially exist in Redis for the given message id

[Source]

    # File lib/beetle/deduplication_store.rb, line 43
43:     def keys(msg_id)
44:       KEY_SUFFIXES.map{|suffix| key(msg_id, suffix)}
45:     end

store some key/value pairs if none of the given keys exist.

[Source]

    # File lib/beetle/deduplication_store.rb, line 76
76:     def msetnx(msg_id, values)
77:       values = values.inject([]){|a,(k,v)| a.concat([key(msg_id, k), v])}
78:       with_failover { redis.msetnx(*values) }
79:     end

extract message id from a given Redis key

[Source]

    # File lib/beetle/deduplication_store.rb, line 48
48:     def msg_id(key)
49:       key =~ /^(msgid:[^:]*:[-0-9a-f]*):.*$/ && $1
50:     end

server:port string from the redis master file

[Source]

     # File lib/beetle/deduplication_store.rb, line 157
157:     def read_master_file
158:       File.read(@config.redis_server).chomp
159:     end

get the Redis instance

[Source]

    # File lib/beetle/deduplication_store.rb, line 23
23:     def redis
24:       redis_master_source = @config.redis_server =~ /^\S+\:\d+$/ ? "server_string" : "master_file"
25:       _eigenclass_.class_eval "def redis\nredis_master_from_\#{redis_master_source}\nend\n", __FILE__, __LINE__
26:       redis
27:     end

redis master file changed outside the running process?

[Source]

     # File lib/beetle/deduplication_store.rb, line 145
145:     def redis_master_file_changed?
146:       @last_time_master_file_changed != File.mtime(@config.redis_server)
147:     end

set current redis master from master file

[Source]

     # File lib/beetle/deduplication_store.rb, line 137
137:     def redis_master_from_master_file
138:       set_current_redis_master_from_master_file if redis_master_file_changed?
139:       @current_master
140:     rescue Errno::ENOENT
141:       nil
142:     end

set current redis master instance (as specified in the Beetle::Configuration)

[Source]

     # File lib/beetle/deduplication_store.rb, line 132
132:     def redis_master_from_server_string
133:       @current_master ||= Redis.from_server_string(@config.redis_server, :db => @config.redis_db)
134:     end

unconditionally store a (key,value) pair with given suffix for given msg_id.

[Source]

    # File lib/beetle/deduplication_store.rb, line 66
66:     def set(msg_id, suffix, value)
67:       with_failover { redis.set(key(msg_id, suffix), value) }
68:     end

set current redis master from server:port string contained in the redis master file

[Source]

     # File lib/beetle/deduplication_store.rb, line 150
150:     def set_current_redis_master_from_master_file
151:       @last_time_master_file_changed = File.mtime(@config.redis_server)
152:       server_string = read_master_file
153:       @current_master = !server_string.blank? ? Redis.from_server_string(server_string, :db => @config.redis_db) : nil
154:     end

store a (key,value) pair with given suffix for given msg_id if it doesn‘t exists yet.

[Source]

    # File lib/beetle/deduplication_store.rb, line 71
71:     def setnx(msg_id, suffix, value)
72:       with_failover { redis.setnx(key(msg_id, suffix), value) }
73:     end

[Validate]