…or how to run Ruby on Rails script as a Sidekiq job, delayed_job job, …

How do you run scripts on your production Ruby on Rails server ?

In most small/medium projects it’s enough just to ssh to a server and run rake task with the script.

or just do heroku run rake mytask if you are on Heroku

But :

  • What if you have loadbalanced environment where the node/vm/container running the long running script may get removed?
  • What if your DevOps dude configured the ssh timeout to be just couple of minutes ?
  • What if your DevOps configured your stack so that you are not able to ssh to production in first place ?

And even if none of the above apply to you: what if you have couple of hundred thousand records that the script needs to update couple of hours? Will you wait till the rake task finish ?

Solution

Create a ActiveJob job that will process individual records (FxSingleJob in an example bellow). In order to call it introduce another ActiveJob job (FxJob in an example bellow) that will enqueue those records in the first place.

This way you will just ssh to server, run bin/rails c and run FxJob.perform_later (or create rake task that will trigger it as perform_later)

Point is that you want to exist soon as possible from the ssh connection so that Worker takes over and do a chunk of a script as a job

Don’t forget that you want to keep your ActiveJobs (background jobs) small so they execute and exit as soon as possible.

In case the queuing job (FxJob) dies prematurely we want to be aware what records were enqueued / processed. There are couple of ways to do it but the easiest is just to introduce a db field that gets marked as processed.

Example

step 1: Introduce a field that would indicate what records were processed

bin/rails generate migration add_fx_script_processed_field_to_works
#db/migrations/xxxxxxx.rb
class AddColumnPublishedMigrationFinished < ActiveRecord::Migration[6.0]
  def change
    add_column :works, :fx_script_processed, :boolean, default: false
  end
end
rake db:migrate

step 2: ActiveJob script

# app/jobs/fx_job.rb
class FxJob < ActiveJob::Base
  queue_as :script # don't forget to introduce new queues if you are  using Sidekiq

  def perform
    Work.where(fx_script_processed: false).find_each do |work|
      FxSingleJob.pefrom_later(work_id: work.id)
    end
  end
end
# app/jobs/fx_single_job.rb
class FxSingleJob < ActiveJob::Base
  queue_as :script_single

  def perform(work_id: )
    work = Work.find_by!(id: work_id)
    # ...
    work.do_some_script_logic
    # ...
    work.fx_script_processed = true
    work.save!
  end
end

If all goes ok after there are no ActiveJobs left to be process that means all your records are processed. You can be sure about that by checking Work.where(fx_script_processed: false).count == 0

How do I know if ActiveJobs tasks finished? With Sidekiq you can check Sidekiq web UI)

If you discover the main FxJob job died prematurely you can just retrigger it with FxJob.perform_now. You don’t have to be worried that it will reprocess same records again due to Work.where(fx_script_processed: false) condition

Lazy solution example

It may feel like an overkill but this is the best way how to ensure your data was processed (if it’s important data)

But in lot of cases you don’t need to introduce extra db field (as you are able to detect it from the DB results)

Also there is no particular reason to introduce separate job file if you are able to place all of the logic in one ActiveJob file. It really depends how comfortable you are that you will not screw it up.

In 90% of cases this is more than enough:

class LazyFxJob < ActiveJob::Base
  queue_as :script

  def perform(work_id: nil)
    if work_id.present?
      work = Work.find_by!(id: work_id)
      work.make_public
      work.save!
    else
      # Depending on your logic it can be any scope. You don't need to process all records
      #
      #    Work.where(some_condition: false).find_each do |w|
      #    Work.order(id: :desc).find_each do |w|
      #    ...
      #
      Work.where(published: false).find_each do |w|
        LazyFxJob.perform_later(work_id: w.id)
      end
    end
  end
end

…and trigger LazyFxJob.perform_later

Rails #find_each will load results to memorry in chunks of 1000. So it’s better to use it instead of #each if you have many thousand records to load

Paginated script scenario

Sometimes you are dealing with a scenario where you are not able to trigger multiple jobs at the same time.

For example some APIs limit how many requests you can do per minute. If you enqueue multiple jobs at the same time hitting the API you will definitely kill that limit.

At the same time you want to exit/finish your jobs as soon as possible (so they don’t time out)

Way how to get around this is to trigger a job that will enqueue itself after finishing single call.

class SyncOutdatedProductDescriptionsJob < ActiveJob::Base
  queue_as :script

  def perform(limit: 3)
    # products that yet not been updated
    products = Product.where("last_automated_description_update_at < ?", Date.today) 

    # random sample of data (works in PostgreSQL & Rails6)
    products = products.order(Arel.sql('RANDOM()'))

    # limited by `limit` resuls
    products = products.limit(limit)

    if products.any?
      products.each do |product|
        process_product(product)
      end

      # we are requeueing the job to process another set
      SyncOutdatedProductDescriptionsJob.perform_later(limit: limit)
    else
      # All is finished ^_^
      # ...maybe send email to Admin that script finished ?
    end
  end

  private

  def process_product(product)
    url = product.external_description_url
    externaal_description = HTTParty.get(url)

    product.description = externaal_description
    product.last_automated_description_update_at = Time.now
    praduct.save!
  end
end

trigger with: SyncOutdatedProductDescriptionsJob.perform_later

It’s like paginated book. You finish one page with one job. Then you process another page with another job.

Script above will process 3 unprocessed products (fetching their external API description and saving it) and requeue itself so it process another 3 products.

If you discover the scripts are killing your API limit you can decrease how many items it will process by triggering SyncOutdatedProductDescriptionsJob.perform_later(limit: 1) instead.

Or if script is too slow increase like: SyncOutdatedProductDescriptionsJob.perform_later(limit: 9)

Yes it’s not ideal as if your job dies, other set of data will not get queued and therefore not processed. But again it can be retriggered without fear we will process already processed items.

Bonus - nice ActiveJob tricks

retry_on

Thing to remember is that by using ActiveJob perform_later we are dealing with asynchronous calls.

There are cases when exceptions are raised in ActiveJob just because resource is not ready (E.g. Redis is faster than PostgreSQL so you may end up with calling Sidekiq job or record.id before SQL transaction finish writing the record)

You can easily tell the jobs to re-execute on known exceptions

class SyncOutdatedMailchimpMembersJob < ActiveJob::Base
  queue_as :whatever
  retry_on UserNotReadyYet

  def perform(user_id: )
    user = User.find_by!(id: user_id)

    user.is_user_ready? || raise(UserNotReadyYet)
    # ...
  end
end

I have entire TIL note on this https://blog.eq8.eu/til/retry-active-job-sidekiq-when-exception.html if you want to learn more

Discussion