Skip to content

Async Execution

Axn provides built-in support for asynchronous execution through background job processing libraries. This allows you to execute actions in the background without blocking the main thread.

Overview

Async execution in Axn is designed to be simple and consistent across different background job libraries. You can configure async behavior globally or per-action, and all async adapters support the same interface.

Basic Usage

Configuring Async Adapters

ruby
class EmailAction
  include Axn

  # Configure async adapter
  async :sidekiq

  expects :user, :message

  def call
    # Send email logic
  end
end

# Execute immediately (synchronous)
result = EmailAction.call(user: user, message: "Welcome!")

# Execute asynchronously (background)
EmailAction.call_async(user: user, message: "Welcome!")

Available Async Adapters

Sidekiq

The Sidekiq adapter provides integration with the Sidekiq background job processing library.

ruby
# In your action class
async :sidekiq do
  sidekiq_options queue: "high_priority", retry: 5
end

# Or with keyword arguments (shorthand)
async :sidekiq, queue: "high_priority", retry: 5

Configuration options:

  • queue: The Sidekiq queue name (default: "default")
  • retry: Number of retry attempts (default: 25)
  • Any other Sidekiq options supported by sidekiq_options

For detailed setup instructions including middleware configuration, see Sidekiq Adapter Setup.

ActiveJob

The ActiveJob adapter provides integration with Rails' ActiveJob framework.

ruby
# In your action class
async :active_job do
  queue_as "high_priority"
  self.priority = 10
end

Configuration options:

  • queue_as: The ActiveJob queue name
  • priority: Job priority
  • wait: Delay before execution
  • Any other ActiveJob options

For detailed setup instructions, see ActiveJob Adapter Setup.

Disabled

Disables async execution entirely. The action will raise a NotImplementedError when call_async is called.

ruby
# In your action class
async false

Argument Serialization

Arguments passed to call_async are serialized to the backing job queue and rehydrated before your action runs on the worker. What survives that round trip depends on whether ActiveJob is loaded in your app — but it is the same across every adapter within a given deployment, so a Time argument behaves identically whether you run on Sidekiq or ActiveJob.

  • With ActiveJob loaded (typical Rails apps): arguments are serialized via ActiveJob::Arguments, so a rich set of types round-trips losslessly — String, Integer, Float, true/false/nil, Symbol, Date, Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration, BigDecimal, Range, and Array/Hash of those. ActiveRecord records and ActiveStorage attachments travel as compact GlobalID references (not copied into the payload) and are re-located on the worker.
  • Without ActiveJob: only JSON-native values (String, Integer, Float, true/false/nil, and Array/Hash of those with string keys) and GlobalID-able objects (e.g. ActiveRecord records) are supported.

Anything that can't be serialized cleanly raises Axn::Async::UnserializableArgument at enqueue time — naming the offending field and how to fix it — rather than silently corrupting the value on the round trip (the previous behavior, where e.g. a Time could arrive on the worker as a String):

ruby
class SendReport
  include Axn
  async :sidekiq
  expects :occurred_at, type: Time

  def call = # …
end

SendReport.call_async(occurred_at: Time.current)        # ✅ arrives as a Time on the worker
SendReport.call_async(occurred_at: Tempfile.new("rpt")) # ✗ raises Axn::Async::UnserializableArgument

The same rules apply to the static arguments passed to enqueue_all.

Delayed Execution

All async adapters support delayed execution using the _async parameter in call_async. This allows you to schedule actions to run at specific future times without changing the interface.

ruby
class EmailAction
  include Axn
  async :sidekiq

  expects :user, :message

  def call
    # Send email logic
  end
end

# Immediate execution
EmailAction.call_async(user: user, message: "Welcome!")

# Delayed execution - wait 1 hour
EmailAction.call_async(user: user, message: "Follow up", _async: { wait: 1.hour })

# Scheduled execution - run at specific time
EmailAction.call_async(user: user, message: "Reminder", _async: { wait_until: 1.week.from_now })

Supported Scheduling Options

  • wait: Execute after a specific time interval (e.g., 1.hour, 30.minutes)
  • wait_until: Execute at a specific future time (e.g., 1.hour.from_now, Time.parse("2024-01-01 12:00:00"))

Adapter-Specific Behavior

  • Sidekiq: Uses perform_in for wait and perform_at for wait_until
  • ActiveJob: Uses set(wait:) for wait and set(wait_until:) for wait_until
  • Disabled: Ignores scheduling options and raises NotImplementedError

Parameter Name Safety

The _async parameter is reserved for scheduling options.

Global Configuration

You can set default async configuration that will be applied to all actions that don't explicitly configure their own async behavior:

ruby
Axn.configure do |c|
  # Set a default async configuration
  c.set_default_async(:sidekiq, queue: "default") do
    sidekiq_options retry: 3
  end
end

# Now all actions will use Sidekiq by default
class MyAction
  include Axn
  # No async configuration needed - uses default
end

Error Handling

Axn distinguishes between business logic failures (via fail!) and unexpected exceptions:

  • fail! (business failures): These do NOT trigger retries. A call to fail! indicates a deliberate business decision that should not be retried.
  • Unexpected exceptions: These DO trigger retries, allowing transient errors to be recovered.
ruby
class PaymentAction
  include Axn
  async :sidekiq, retry: 3

  def call
    # This will NOT trigger retries - it's a business decision
    fail! "Insufficient funds" if insufficient_funds?

    # This WILL trigger retries - it's an unexpected error
    raise NetworkError, "Connection timeout" if connection_failed?
  end
end

This follows Sidekiq's own guidance: "Retries are for unexpected errors."

Exception Reporting in Async Context

When unexpected exceptions occur in async jobs, Axn provides control over when on_exception is triggered. See Async Exception Reporting for configuration options.

Batch Enqueueing with enqueues_each

The enqueues_each method provides a declarative way to set up batch enqueueing. It automatically iterates over collections and enqueues each item as a separate background job.

Basic Usage

ruby
class SyncForCompany
  include Axn
  async :sidekiq

  expects :company, model: Company

  def call
    puts "Syncing data for company: #{company.name}"
    # Sync individual company data
  end

  # No enqueues_each needed! Source is auto-inferred from model: Company
end

# Usage
SyncForCompany.enqueue_all  # Automatically iterates Company.all and enqueues each company

How it works:

  1. enqueue_all validates configuration upfront (async configured, static args present)
  2. By default, enqueues an EnqueueAllOrchestrator job that performs the iteration in the background
  3. If an iterable is passed as a keyword argument (e.g., overriding the source with an AR relation), that source can't be serialized for a background job, so the iteration runs in the foreground instead
  4. During execution, iterates over the source collection and enqueues individual jobs
  5. Model-based iterations (using find_each) are processed first for memory efficiency

Auto-Inference from model: Declarations

If a field has a model: declaration and the model class responds to find_each, you don't need to explicitly declare enqueues_each. The source collection is automatically inferred:

ruby
class SyncForCompany
  include Axn
  async :sidekiq

  expects :company, model: Company  # Auto-inferred: Company.all 

  def call
    # ... sync logic
  end

  # No enqueues_each needed - automatically iterates Company.all
end

SyncForCompany.enqueue_all  # Works without explicit enqueues_each!

Explicit Configuration with enqueues_each

Use enqueues_each when you need to:

  • Override the default source (e.g., Company.active instead of Company.all)
  • Add filtering logic
  • Extract specific attributes
  • Iterate over fields without model: declarations
ruby
class SyncForCompany
  include Axn
  async :sidekiq

  expects :company, model: Company

  def call
    # ... sync logic
  end

  # Override default source
  enqueues_each :company, from: -> { Company.active }

  # With extraction (passes company_id instead of company object)
  enqueues_each :company_id, from: -> { Company.active }, via: :id

  # With filter block
  enqueues_each :company do |company|
    company.active? && !company.in_exit?
  end

  # Method name as source
  enqueues_each :company, from: :active_companies
end

Overriding on enqueue_all Call

You can override iteration sources or make fields static when calling enqueue_all:

ruby
class SyncForCompany
  include Axn
  async :sidekiq

  expects :company, model: Company
  expects :user, model: User

  def call
    # ... sync logic
  end

  # Default: iterates Company.all
  enqueues_each :company
end

# Override with a subset (enumerable kwarg replaces source)
SyncForCompany.enqueue_all(company: Company.active.limit(10))

# Override with a single value (scalar kwarg makes it static, no iteration)
SyncForCompany.enqueue_all(company: Company.find(123))

# Mix static and iterated fields
SyncForCompany.enqueue_all(
  company: Company.active,  # Iterates over active companies
  user: User.find(1)        # Static: same user for all jobs
)

Dynamic Iteration via Kwargs

You can iterate over fields without any enqueues_each declaration by passing enumerables directly:

ruby
class ProcessFormats
  include Axn
  async :sidekiq

  expects :format
  expects :mode

  def call
    # ... process logic
  end
end

# Pass enumerables to create cross-product iteration
ProcessFormats.enqueue_all(
  format: [:csv, :json, :xml],  # Iterates: 3 jobs
  mode: :full                    # Static: same mode for all
)

# Multiple enumerables create cross-product
ProcessFormats.enqueue_all(
  format: [:csv, :json],         # 2 formats
  mode: [:full, :incremental]    # 2 modes
)
# Result: 2 × 2 = 4 jobs total

Note: Arrays and Sets are treated as static values (not iterated) when the field expects an enumerable type:

ruby
expects :tags, type: Array

# This passes the entire array as a static value
ProcessTags.enqueue_all(tags: ["ruby", "rails", "testing"])

Multi-Field Cross-Product Iteration

Multiple enqueues_each declarations create a cross-product of all combinations:

ruby
class SyncForUserAndCompany
  include Axn
  async :sidekiq

  expects :user, model: User
  expects :company, model: Company

  def call
    # ... sync logic for user + company combination
  end

  enqueues_each :user, from: -> { User.active } 
  enqueues_each :company, from: -> { Company.active }
end

# Creates user_count × company_count jobs
# Each combination of (user, company) gets its own job
SyncForUserAndCompany.enqueue_all

Static Fields

Fields declared with expects but not covered by enqueues_each (or auto-inference) become static fields that must be passed to enqueue_all:

ruby
class SyncWithMode
  include Axn
  async :sidekiq

  expects :company, model: Company  # Auto-inferred, will iterate 
  expects :sync_mode                 # Static, must be provided

  def call
    # Uses both company (iterated) and sync_mode (static)
  end
end

# sync_mode must be provided - it's passed to every enqueued job
SyncWithMode.enqueue_all(sync_mode: :full)

Run summary with on_enqueue_all

on_enqueue_all registers a once-per-run callback that fires after the batch fan-out completes — useful for posting a summary, emitting a metric, or logging a heartbeat without hand-rolling a parent wrapper action. It runs inside the orchestrator (off the clock thread), in the context of your action class, so class-level log/info/warn are available.

Like the other on_* callbacks, it accepts either a block or a Symbol method name (a Symbol resolves to a class method, since no action instance exists during enqueueing), and supports if:/unless: conditions.

ruby
class StockCertificate::EoyTaxReminder
  include Axn
  async :sidekiq

  expects :tax_profile, model: TaxProfile
  enqueues_each :tax_profile, from: -> { TaxProfile.needs_address_validation }

  on_enqueue_all do |sources:, count:| 
    active, inactive = sources[:tax_profile].partition { _1.user.active? }
    SlackSender.call(channel: :eng_ops, text: "#{active.size} active, #{inactive.size} deactivated (#{count} enqueued)")
  end

  def call
    # per-tax-profile work
  end
end

The handler may declare any subset of these keyword arguments (or none):

  • count: — the exact number of jobs enqueued (post-filter). Always available, including for cross-product runs.
  • sources: — a hash of { field => resolved_source } for each iterated field, e.g. { tax_profile: <relation> } or, for a cross-product, { user: <rel>, company: <rel> }. Sources are the resolved-but-un-materialized relations (run your own .count / .group / .partition), and reflect any kwarg overrides passed to enqueue_all.
ruby
# Count-only heartbeat via a class method:
on_enqueue_all :log_summary
def self.log_summary(count:) = info "Found #{count} events"

# Conditional summary:
on_enqueue_all(if: :summary_enabled?) { |count:| info "Found #{count} events" }

Multiple on_enqueue_all declarations are allowed; like the rest of the on_* family they fire most-recent-first (last-defined wins). if:/unless: conditions are evaluated the same way as the other callbacks' matchers (against the action, with no exception), so they can condition on class-level state but cannot observe sources/count.

Error handling: a raise inside the handler is swallowed (logged; re-raised in development only when Axn.config.raise_piping_errors_in_dev is set) and cannot change the enqueue outcome — the fan-out has already completed. This matches on_success semantics; rescue inside your handler if you need stronger guarantees.

When it fires: on any run that goes through the fan-out (the async path and the foreground path used when an iterable kwarg can't be serialized). It does not fire for an action with no expects, which enqueues a single job directly without fanning out.

Memory Efficiency

For optimal memory usage, model-based configs (using find_each) are automatically processed first in nested iterations. This ensures ActiveRecord-style batch processing happens before loading potentially large enumerables into memory.

ruby
# Model-based iteration uses find_each (memory efficient)
expects :company, model: Company  # Processed first

# Array-based iteration uses each (loads all into memory)
enqueues_each :format, from: -> { [:csv, :json, :xml] }  # Processed second

Iteration Method Selection

  • find_each: Used when the source responds to find_each (ActiveRecord collections) - processes in batches for memory efficiency
  • each: Used for plain arrays and other enumerables - loads all items into memory

Background vs Foreground Execution

By default, enqueue_all enqueues an EnqueueAllOrchestrator job to perform the iteration and individual job enqueueing in the background. This makes it safe to call directly from clock processes (e.g., Heroku scheduler) without risking memory bloat from loading large collections.

However, if you pass an enumerable override (like an ActiveRecord relation or array), enqueue_all automatically falls back to foreground execution—iterating and enqueueing immediately in the current process. This is because the iteration source (a lambda wrapping the enumerable) cannot be serialized for background execution.

ruby
class SyncForCompany
  include Axn
  async :sidekiq

  expects :company, model: Company

  def call
    # ... sync logic
  end

  enqueues_each :company, from: -> { Company.active }
end

# Default: enqueues orchestrator job in background (safe for clock processes)
SyncForCompany.enqueue_all

# Override with a relation: executes in foreground (relation can't be serialized)
SyncForCompany.enqueue_all(company: Company.where(plan: "enterprise"))

# Override with a single value: runs in background (GlobalID-serializable scalar)
SyncForCompany.enqueue_all(company: Company.find(123))

Why this matters:

  • Configure your default iteration source via enqueues_each for scheduled/recurring jobs
  • By default, enqueue_all runs safely in the background without loading your entire dataset into memory
  • For one-off manual calls with a filtered subset, pass an enumerable override—foreground execution handles it automatically
  • Scalar overrides (single objects) are serialized via GlobalID and still run in the background

This design lets you use the same action class for both scheduled batch processing and ad-hoc targeted runs.

Edge Cases and Limitations

  1. Fields expecting enumerable types: If a field expects Array or Set, arrays/sets passed to enqueue_all are treated as static values (not iterated)
  2. Strings and Hashes: Always treated as static values, even though they respond to :each
  3. No model or source: If a field has no model: declaration and no enqueues_each with from:, you must pass it as a kwarg to enqueue_all or it will raise an error
  4. Required static fields: Fields without defaults that aren't covered by iteration must be provided to enqueue_all