← 返回
内容创作 中文

Oban Designer

Design and implement Oban background job workers for Elixir. Configure queues, retry strategies, uniqueness constraints, cron scheduling, and error handling. Generate Oban workers, queue config, and test setups. Use when adding background jobs, async processing, scheduled tasks, or recurring cron jobs to an Elixir project using Oban.
为 Elixir 设计并实现 Oban 后台任务 Worker,配置队列、重试策略、唯一性约束、cron 定时调度和错误处理,生成 Worker、队列配置及测试设置。适用于在 Elixir 项目中使用 Oban 添加后台任务、异步处理、定时任务或循环 cron 任务。
gchapim
内容创作 clawhub v1.0.1 1 版本 99602.5 Key: 无需
★ 0
Stars
📥 2,255
下载
💾 0
安装
1
版本
#latest

概述

Oban Designer

Installation

# mix.exs
{:oban, "~> 2.18"}

# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  queues: [default: 10, mailers: 20, webhooks: 50, events: 5],
  plugins: [
    Oban.Plugins.Pruner,
    {Oban.Plugins.Cron, crontab: [
      {"0 2 * * *", MyApp.Workers.DailyCleanup},
      {"*/5 * * * *", MyApp.Workers.MetricsCollector}
    ]}
  ]

# In application.ex children:
{Oban, Application.fetch_env!(:my_app, Oban)}

Generate the Oban migrations:

mix ecto.gen.migration add_oban_jobs_table
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration
  def up, do: Oban.Migration.up(version: 12)
  def down, do: Oban.Migration.down(version: 1)
end

Worker Implementation

Basic Worker

defmodule MyApp.Workers.SendEmail do
  use Oban.Worker,
    queue: :mailers,
    max_attempts: 5,
    priority: 1

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"to" => to, "template" => template} = args}) do
    case MyApp.Mailer.deliver(to, template, args) do
      {:ok, _} -> :ok
      {:error, :temporary} -> {:error, "temporary failure"}  # Will retry
      {:error, :permanent} -> {:cancel, "invalid address"}   # Won't retry
    end
  end
end

Return Values

ReturnEffect
----------------
:okJob marked complete
{:ok, result}Job marked complete
{:error, reason}Job retried (counts as attempt)
{:cancel, reason}Job cancelled, no more retries
{:snooze, seconds}Re-scheduled, doesn't count as attempt
{:discard, reason}Job discarded (Oban 2.17+)

Queue Configuration

See references/worker-patterns.md for common worker patterns.

Sizing Guidelines

QueueConcurrencyUse Case
-----------------------------
default10General-purpose
mailers20Email delivery (I/O bound)
webhooks50Webhook delivery (I/O bound, high volume)
media5Image/video processing (CPU bound)
events5Analytics, audit logs
critical3Billing, payments

Queue Priority

Jobs within a queue execute by priority (0 = highest). Use sparingly:

%{user_id: user.id}
|> MyApp.Workers.SendEmail.new(priority: 0)  # Urgent
|> Oban.insert()

Retry Strategies

Default Backoff

Oban uses exponential backoff: attempt^4 + attempt seconds.

Custom Backoff

defmodule MyApp.Workers.WebhookDelivery do
  use Oban.Worker,
    queue: :webhooks,
    max_attempts: 10

  @impl Oban.Worker
  def backoff(%Oban.Job{attempt: attempt}) do
    # Exponential with jitter: 2^attempt + random(0..30)
    trunc(:math.pow(2, attempt)) + :rand.uniform(30)
  end

  @impl Oban.Worker
  def perform(%Oban.Job{args: args}) do
    # ...
  end
end

Timeout

use Oban.Worker, queue: :media

@impl Oban.Worker
def timeout(%Oban.Job{args: %{"size" => "large"}}), do: :timer.minutes(10)
def timeout(_job), do: :timer.minutes(2)

Uniqueness

Prevent duplicate jobs:

defmodule MyApp.Workers.SyncAccount do
  use Oban.Worker,
    queue: :default,
    unique: [
      period: 300,               # 5 minutes
      states: [:available, :scheduled, :executing, :retryable],
      keys: [:account_id]        # Unique by this arg key
    ]
end

Unique Options

OptionDefaultDescription
------------------------------
period60Seconds to enforce uniqueness (:infinity for forever)
statesall activeWhich job states to check
keysall argsSpecific arg keys to compare
timestamp:inserted_atUse :scheduled_at for scheduled uniqueness

Replace Existing

%{account_id: id}
|> MyApp.Workers.SyncAccount.new(
  replace: [:scheduled_at],    # Update scheduled_at if duplicate
  schedule_in: 60
)
|> Oban.insert()

Cron Scheduling

# config.exs
plugins: [
  {Oban.Plugins.Cron, crontab: [
    {"0 */6 * * *", MyApp.Workers.DigestEmail},
    {"0 2 * * *", MyApp.Workers.DailyCleanup},
    {"0 0 1 * *", MyApp.Workers.MonthlyReport},
    {"*/5 * * * *", MyApp.Workers.HealthCheck, args: %{service: "api"}},
  ]}
]

Cron expressions: minute hour day_of_month month day_of_week.

Inserting Jobs

# Immediate
%{user_id: user.id, template: "welcome"}
|> MyApp.Workers.SendEmail.new()
|> Oban.insert()

# Scheduled
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(schedule_in: 3600)
|> Oban.insert()

# Scheduled at specific time
%{report_id: id}
|> MyApp.Workers.GenerateReport.new(scheduled_at: ~U[2024-01-01 00:00:00Z])
|> Oban.insert()

# Bulk insert
changesets = Enum.map(users, fn user ->
  MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
Oban.insert_all(changesets)

# Inside Ecto.Multi
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, changeset)
|> Oban.insert(:welcome_email, fn %{user: user} ->
  MyApp.Workers.SendEmail.new(%{user_id: user.id})
end)
|> Repo.transaction()

Oban Pro Features

Available with Oban Pro license:

Batch (group of jobs)

# Process items in batch, run callback when all complete
batch = MyApp.Workers.ProcessItem.new_batch(
  items |> Enum.map(&%{item_id: &1.id}),
  callback: {MyApp.Workers.BatchComplete, %{batch_name: "import"}}
)
Oban.insert_all(batch)

Workflow (job dependencies)

Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add(:extract, MyApp.Workers.Extract.new(%{file: path}))
|> Oban.Pro.Workflow.add(:transform, MyApp.Workers.Transform.new(%{}), deps: [:extract])
|> Oban.Pro.Workflow.add(:load, MyApp.Workers.Load.new(%{}), deps: [:transform])
|> Oban.insert_all()

Chunk (aggregate multiple jobs)

defmodule MyApp.Workers.BulkIndex do
  use Oban.Pro.Workers.Chunk,
    queue: :indexing,
    size: 100,            # Process 100 at a time
    timeout: 30_000       # Or after 30s

  @impl true
  def process(jobs) do
    items = Enum.map(jobs, & &1.args)
    SearchIndex.bulk_upsert(items)
    :ok
  end
end

Testing

See references/testing-oban.md for detailed testing patterns.

Setup

# config/test.exs
config :my_app, Oban,
  testing: :manual  # or :inline for synchronous execution

# test_helper.exs (if using :manual)
Oban.Testing.start()

Asserting Job Enqueued

use Oban.Testing, repo: MyApp.Repo

test "enqueues welcome email on signup" do
  {:ok, user} = Accounts.register(%{email: "test@example.com"})

  assert_enqueued worker: MyApp.Workers.SendEmail,
    args: %{user_id: user.id, template: "welcome"},
    queue: :mailers
end

Executing Jobs in Tests

test "processes email delivery" do
  {:ok, _} =
    perform_job(MyApp.Workers.SendEmail, %{
      "to" => "user@example.com",
      "template" => "welcome"
    })
end

Monitoring

Telemetry Events

# Attach in application.ex
:telemetry.attach_many("oban-logger", [
  [:oban, :job, :start],
  [:oban, :job, :stop],
  [:oban, :job, :exception]
], &MyApp.ObanTelemetry.handle_event/4, %{})

Key Metrics to Track

  • Job execution duration (p50, p95, p99)
  • Queue depth (available jobs per queue)
  • Error rate per worker
  • Retry rate per worker

版本历史

共 1 个版本

  • v1.0.1 当前
    2026-03-28 16:06 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

content-creation

Humanizer

biostartechnology
消除AI写作痕迹,使文本更自然真实。基于维基百科"AI写作特征"指南,识别并修正夸张象征、宣传用语、肤浅-ing分析、模糊归因、破折号滥用、三项排比、AI词汇、负面平行结构及冗长连接词等模式。
★ 857 📥 199,308
content-creation

Baidu Wenku AIPPT

ide-rea
使用百度文库 AI 智能生成 PPT,自动根据内容选择模板。
★ 66 📥 46,133
content-creation

AdMapix

fly0pants
广告情报与应用数据分析助手,支持搜索广告素材、分析应用排名、下载量、收入及市场洞察,用于广告素材和竞品分析。
★ 295 📥 136,413