Defining Workers - nesquena/backburner GitHub Wiki
In Backburner, there are actually many different strategies for processing jobs
which are reflected by multiple workers. Custom workers can be defined fairly easily.
Simply subclass Backburner::Worker and then define a few key methods for the strategy:
| Method | Description |
|---|---|
prepare |
Used to prepare the job queues before job processing starts. |
start |
Actual processing strategy based on how jobs are consumed. |
process_tube_names |
Simple way to modify or manage tube names if needed (optional) |
Check out the simple worker which is a single-threaded process that processes jobs as simply as possible. You can also check out the forking worker as well as the threads_on_fork worker for more advanced examples. You can also check out the base worker class which can be helpful.
Example
class AlternateWorker < Backburner::Worker
# Used to prepare job queues before processing jobs.
# Setup beanstalk tube_names and watch all specified tubes for jobs.
def prepare
# Expand tube names with prefix from 'foo' => 'backburner.queues.foo'
self.tube_names.map! { |name| expand_tube_name(name) }
# Log information to logger
log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
# Watch the tubes for processing
self.connection.tubes.watch!(*self.tube_names)
end
# Starts processing new jobs indefinitely.
# Primary way to consume and process jobs within specified tubes.
def start
prepare # Invoke the prepare method defined above
loop { work_one_job } # Loop over and over processing a job from the tubes
end
# Processing the tube names
# Use to change the way tube names are processed
# Should at minimum `compact_tube_names` which cleans up the tube_names array
def process_tube_names(tube_names)
tubes = compact_tube_names(tube_names)
# ...extra processing here...
tubes
end
end
Helpers
When defining you have access to several existing worker methods, a few key helpers as well as several logging methods. The most notable being:
work_one_job- Process a single job from all watched tubes.compact_tube_names- Accepts list of tube_names and normalizes them.tube_names- The list of tube names for this worker to watch.connection- The beanstalkd underlying connection.expand_tube_name- Turns a tube name into a fully qualified tube name with prefix.log_info- Used to log information to specified logger.log_error- Used to log errors to specified logger.
Usage
Once your worker is defined, you can set the worker as the default with:
Backburner.configure do |config|
config.default_worker = AlternateWorker
end
This will have the worker used by default for all job processing. You can also change the worker
explicitly when calling Backburner.work:
Backburner.work('newsletter_sender', :worker => AlternateWorker)