Async Execution
Axn provides built-in support for asynchronous execution through background job processing libraries. This allows you to execute actions in the background without blocking the main thread.
Overview
Async execution in Axn is designed to be simple and consistent across different background job libraries. You can configure async behavior globally or per-action, and all async adapters support the same interface.
Basic Usage
Configuring Async Adapters
class EmailAction
include Axn
# Configure async adapter
async :sidekiq
expects :user, :message
def call
# Send email logic
end
end
# Execute immediately (synchronous)
result = EmailAction.call(user: user, message: "Welcome!")
# Execute asynchronously (background)
EmailAction.call_async(user: user, message: "Welcome!")Available Async Adapters
Sidekiq
The Sidekiq adapter provides integration with the Sidekiq background job processing library.
# In your action class
async :sidekiq do
sidekiq_options queue: "high_priority", retry: 5
end
# Or with keyword arguments (shorthand)
async :sidekiq, queue: "high_priority", retry: 5Configuration options:
queue: The Sidekiq queue name (default: "default")retry: Number of retry attempts (default: 25)- Any other Sidekiq options supported by
sidekiq_options
For detailed setup instructions including middleware configuration, see Sidekiq Adapter Setup.
ActiveJob
The ActiveJob adapter provides integration with Rails' ActiveJob framework.
# In your action class
async :active_job do
queue_as "high_priority"
self.priority = 10
endConfiguration options:
queue_as: The ActiveJob queue namepriority: Job prioritywait: Delay before execution- Any other ActiveJob options
For detailed setup instructions, see ActiveJob Adapter Setup.
Disabled
Disables async execution entirely. The action will raise a NotImplementedError when call_async is called.
# In your action class
async falseArgument Serialization
Arguments passed to call_async are serialized to the backing job queue and rehydrated before your action runs on the worker. What survives that round trip depends on whether ActiveJob is loaded in your app — but it is the same across every adapter within a given deployment, so a Time argument behaves identically whether you run on Sidekiq or ActiveJob.
- With ActiveJob loaded (typical Rails apps): arguments are serialized via
ActiveJob::Arguments, so a rich set of types round-trips losslessly —String,Integer,Float,true/false/nil,Symbol,Date,Time,DateTime,ActiveSupport::TimeWithZone,ActiveSupport::Duration,BigDecimal,Range, andArray/Hashof those. ActiveRecord records and ActiveStorage attachments travel as compact GlobalID references (not copied into the payload) and are re-located on the worker. - Without ActiveJob: only JSON-native values (
String,Integer,Float,true/false/nil, andArray/Hashof those with string keys) and GlobalID-able objects (e.g. ActiveRecord records) are supported.
Anything that can't be serialized cleanly raises Axn::Async::UnserializableArgument at enqueue time — naming the offending field and how to fix it — rather than silently corrupting the value on the round trip (the previous behavior, where e.g. a Time could arrive on the worker as a String):
class SendReport
include Axn
async :sidekiq
expects :occurred_at, type: Time
def call = # …
end
SendReport.call_async(occurred_at: Time.current) # ✅ arrives as a Time on the worker
SendReport.call_async(occurred_at: Tempfile.new("rpt")) # ✗ raises Axn::Async::UnserializableArgumentThe same rules apply to the static arguments passed to enqueue_all.
Delayed Execution
All async adapters support delayed execution using the _async parameter in call_async. This allows you to schedule actions to run at specific future times without changing the interface.
class EmailAction
include Axn
async :sidekiq
expects :user, :message
def call
# Send email logic
end
end
# Immediate execution
EmailAction.call_async(user: user, message: "Welcome!")
# Delayed execution - wait 1 hour
EmailAction.call_async(user: user, message: "Follow up", _async: { wait: 1.hour })
# Scheduled execution - run at specific time
EmailAction.call_async(user: user, message: "Reminder", _async: { wait_until: 1.week.from_now })Supported Scheduling Options
wait: Execute after a specific time interval (e.g.,1.hour,30.minutes)wait_until: Execute at a specific future time (e.g.,1.hour.from_now,Time.parse("2024-01-01 12:00:00"))
Adapter-Specific Behavior
- Sidekiq: Uses
perform_inforwaitandperform_atforwait_until - ActiveJob: Uses
set(wait:)forwaitandset(wait_until:)forwait_until - Disabled: Ignores scheduling options and raises
NotImplementedError
Parameter Name Safety
The _async parameter is reserved for scheduling options.
Global Configuration
You can set default async configuration that will be applied to all actions that don't explicitly configure their own async behavior:
Axn.configure do |c|
# Set a default async configuration
c.set_default_async(:sidekiq, queue: "default") do
sidekiq_options retry: 3
end
end
# Now all actions will use Sidekiq by default
class MyAction
include Axn
# No async configuration needed - uses default
endError Handling
Axn distinguishes between business logic failures (via fail!) and unexpected exceptions:
fail!(business failures): These do NOT trigger retries. A call tofail!indicates a deliberate business decision that should not be retried.- Unexpected exceptions: These DO trigger retries, allowing transient errors to be recovered.
class PaymentAction
include Axn
async :sidekiq, retry: 3
def call
# This will NOT trigger retries - it's a business decision
fail! "Insufficient funds" if insufficient_funds?
# This WILL trigger retries - it's an unexpected error
raise NetworkError, "Connection timeout" if connection_failed?
end
endThis follows Sidekiq's own guidance: "Retries are for unexpected errors."
Exception Reporting in Async Context
When unexpected exceptions occur in async jobs, Axn provides control over when on_exception is triggered. See Async Exception Reporting for configuration options.
Batch Enqueueing with enqueues_each
The enqueues_each method provides a declarative way to set up batch enqueueing. It automatically iterates over collections and enqueues each item as a separate background job.
Basic Usage
class SyncForCompany
include Axn
async :sidekiq
expects :company, model: Company
def call
puts "Syncing data for company: #{company.name}"
# Sync individual company data
end
# No enqueues_each needed! Source is auto-inferred from model: Company
end
# Usage
SyncForCompany.enqueue_all # Automatically iterates Company.all and enqueues each companyHow it works:
enqueue_allvalidates configuration upfront (async configured, static args present)- By default, enqueues an
EnqueueAllOrchestratorjob that performs the iteration in the background - If an iterable is passed as a keyword argument (e.g., overriding the source with an AR relation), that source can't be serialized for a background job, so the iteration runs in the foreground instead
- During execution, iterates over the source collection and enqueues individual jobs
- Model-based iterations (using
find_each) are processed first for memory efficiency
Auto-Inference from model: Declarations
If a field has a model: declaration and the model class responds to find_each, you don't need to explicitly declare enqueues_each. The source collection is automatically inferred:
class SyncForCompany
include Axn
async :sidekiq
expects :company, model: Company # Auto-inferred: Company.all
def call
# ... sync logic
end
# No enqueues_each needed - automatically iterates Company.all
end
SyncForCompany.enqueue_all # Works without explicit enqueues_each!Explicit Configuration with enqueues_each
Use enqueues_each when you need to:
- Override the default source (e.g.,
Company.activeinstead ofCompany.all) - Add filtering logic
- Extract specific attributes
- Iterate over fields without
model:declarations
class SyncForCompany
include Axn
async :sidekiq
expects :company, model: Company
def call
# ... sync logic
end
# Override default source
enqueues_each :company, from: -> { Company.active }
# With extraction (passes company_id instead of company object)
enqueues_each :company_id, from: -> { Company.active }, via: :id
# With filter block
enqueues_each :company do |company|
company.active? && !company.in_exit?
end
# Method name as source
enqueues_each :company, from: :active_companies
endOverriding on enqueue_all Call
You can override iteration sources or make fields static when calling enqueue_all:
class SyncForCompany
include Axn
async :sidekiq
expects :company, model: Company
expects :user, model: User
def call
# ... sync logic
end
# Default: iterates Company.all
enqueues_each :company
end
# Override with a subset (enumerable kwarg replaces source)
SyncForCompany.enqueue_all(company: Company.active.limit(10))
# Override with a single value (scalar kwarg makes it static, no iteration)
SyncForCompany.enqueue_all(company: Company.find(123))
# Mix static and iterated fields
SyncForCompany.enqueue_all(
company: Company.active, # Iterates over active companies
user: User.find(1) # Static: same user for all jobs
)Dynamic Iteration via Kwargs
You can iterate over fields without any enqueues_each declaration by passing enumerables directly:
class ProcessFormats
include Axn
async :sidekiq
expects :format
expects :mode
def call
# ... process logic
end
end
# Pass enumerables to create cross-product iteration
ProcessFormats.enqueue_all(
format: [:csv, :json, :xml], # Iterates: 3 jobs
mode: :full # Static: same mode for all
)
# Multiple enumerables create cross-product
ProcessFormats.enqueue_all(
format: [:csv, :json], # 2 formats
mode: [:full, :incremental] # 2 modes
)
# Result: 2 × 2 = 4 jobs totalNote: Arrays and Sets are treated as static values (not iterated) when the field expects an enumerable type:
expects :tags, type: Array
# This passes the entire array as a static value
ProcessTags.enqueue_all(tags: ["ruby", "rails", "testing"])Multi-Field Cross-Product Iteration
Multiple enqueues_each declarations create a cross-product of all combinations:
class SyncForUserAndCompany
include Axn
async :sidekiq
expects :user, model: User
expects :company, model: Company
def call
# ... sync logic for user + company combination
end
enqueues_each :user, from: -> { User.active }
enqueues_each :company, from: -> { Company.active }
end
# Creates user_count × company_count jobs
# Each combination of (user, company) gets its own job
SyncForUserAndCompany.enqueue_allStatic Fields
Fields declared with expects but not covered by enqueues_each (or auto-inference) become static fields that must be passed to enqueue_all:
class SyncWithMode
include Axn
async :sidekiq
expects :company, model: Company # Auto-inferred, will iterate
expects :sync_mode # Static, must be provided
def call
# Uses both company (iterated) and sync_mode (static)
end
end
# sync_mode must be provided - it's passed to every enqueued job
SyncWithMode.enqueue_all(sync_mode: :full)Run summary with on_enqueue_all
on_enqueue_all registers a once-per-run callback that fires after the batch fan-out completes — useful for posting a summary, emitting a metric, or logging a heartbeat without hand-rolling a parent wrapper action. It runs inside the orchestrator (off the clock thread), in the context of your action class, so class-level log/info/warn are available.
Like the other on_* callbacks, it accepts either a block or a Symbol method name (a Symbol resolves to a class method, since no action instance exists during enqueueing), and supports if:/unless: conditions.
class StockCertificate::EoyTaxReminder
include Axn
async :sidekiq
expects :tax_profile, model: TaxProfile
enqueues_each :tax_profile, from: -> { TaxProfile.needs_address_validation }
on_enqueue_all do |sources:, count:|
active, inactive = sources[:tax_profile].partition { _1.user.active? }
SlackSender.call(channel: :eng_ops, text: "#{active.size} active, #{inactive.size} deactivated (#{count} enqueued)")
end
def call
# per-tax-profile work
end
endThe handler may declare any subset of these keyword arguments (or none):
count:— the exact number of jobs enqueued (post-filter). Always available, including for cross-product runs.sources:— a hash of{ field => resolved_source }for each iterated field, e.g.{ tax_profile: <relation> }or, for a cross-product,{ user: <rel>, company: <rel> }. Sources are the resolved-but-un-materialized relations (run your own.count/.group/.partition), and reflect any kwarg overrides passed toenqueue_all.
# Count-only heartbeat via a class method:
on_enqueue_all :log_summary
def self.log_summary(count:) = info "Found #{count} events"
# Conditional summary:
on_enqueue_all(if: :summary_enabled?) { |count:| info "Found #{count} events" }Multiple on_enqueue_all declarations are allowed; like the rest of the on_* family they fire most-recent-first (last-defined wins). if:/unless: conditions are evaluated the same way as the other callbacks' matchers (against the action, with no exception), so they can condition on class-level state but cannot observe sources/count.
Error handling: a raise inside the handler is swallowed (logged; re-raised in development only when Axn.config.raise_piping_errors_in_dev is set) and cannot change the enqueue outcome — the fan-out has already completed. This matches on_success semantics; rescue inside your handler if you need stronger guarantees.
When it fires: on any run that goes through the fan-out (the async path and the foreground path used when an iterable kwarg can't be serialized). It does not fire for an action with no expects, which enqueues a single job directly without fanning out.
Memory Efficiency
For optimal memory usage, model-based configs (using find_each) are automatically processed first in nested iterations. This ensures ActiveRecord-style batch processing happens before loading potentially large enumerables into memory.
# Model-based iteration uses find_each (memory efficient)
expects :company, model: Company # Processed first
# Array-based iteration uses each (loads all into memory)
enqueues_each :format, from: -> { [:csv, :json, :xml] } # Processed secondIteration Method Selection
find_each: Used when the source responds tofind_each(ActiveRecord collections) - processes in batches for memory efficiencyeach: Used for plain arrays and other enumerables - loads all items into memory
Background vs Foreground Execution
By default, enqueue_all enqueues an EnqueueAllOrchestrator job to perform the iteration and individual job enqueueing in the background. This makes it safe to call directly from clock processes (e.g., Heroku scheduler) without risking memory bloat from loading large collections.
However, if you pass an enumerable override (like an ActiveRecord relation or array), enqueue_all automatically falls back to foreground execution—iterating and enqueueing immediately in the current process. This is because the iteration source (a lambda wrapping the enumerable) cannot be serialized for background execution.
class SyncForCompany
include Axn
async :sidekiq
expects :company, model: Company
def call
# ... sync logic
end
enqueues_each :company, from: -> { Company.active }
end
# Default: enqueues orchestrator job in background (safe for clock processes)
SyncForCompany.enqueue_all
# Override with a relation: executes in foreground (relation can't be serialized)
SyncForCompany.enqueue_all(company: Company.where(plan: "enterprise"))
# Override with a single value: runs in background (GlobalID-serializable scalar)
SyncForCompany.enqueue_all(company: Company.find(123))Why this matters:
- Configure your default iteration source via
enqueues_eachfor scheduled/recurring jobs - By default,
enqueue_allruns safely in the background without loading your entire dataset into memory - For one-off manual calls with a filtered subset, pass an enumerable override—foreground execution handles it automatically
- Scalar overrides (single objects) are serialized via GlobalID and still run in the background
This design lets you use the same action class for both scheduled batch processing and ad-hoc targeted runs.
Edge Cases and Limitations
- Fields expecting enumerable types: If a field expects
ArrayorSet, arrays/sets passed toenqueue_allare treated as static values (not iterated) - Strings and Hashes: Always treated as static values, even though they respond to
:each - No model or source: If a field has no
model:declaration and noenqueues_eachwithfrom:, you must pass it as a kwarg toenqueue_allor it will raise an error - Required static fields: Fields without defaults that aren't covered by iteration must be provided to
enqueue_all