Skip to content

Commit

Permalink
Merge pull request #396 from akira/feature/queue_adapter
Browse files Browse the repository at this point in the history
continuation of #374
  • Loading branch information
ananthakumaran authored Nov 30, 2019
2 parents b084b86 + 6529199 commit 78628b7
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 9 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,40 @@ By default, Exq will register itself under the ```Elixir.Exq``` atom. You can c
{:ok, exq} = Exq.start_link(name: Exq.Custom)
```

## Testing

`Exq.Mock` module provides few options to test your workers.

```elixir
# change queue_adapter in config/test.exs
config :exq,
queue_adapter: Exq.Adapters.Queue.Mock

# start mock server in your test_helper.exs
Exq.Mock.start_link(mode: :redis)
```

`Exq.Mock` currently supports three modes. The default mode can provided
on the `Exq.Mock.start_link` call. The mode could be overriden for
each test by calling `Exq.Mock.set_mode(:fake)`

### redis

This could be used for integration testing. Doesn't support `async:
true` option.

### fake

The jobs get enqueued in a local queue and never get
executed. `Exq.Mock.jobs()` returns all the jobs. Supports `async:
true` option.

### inline

The jobs get executed in the same process. Supports `async: true` option.



## Donation

To donate, send to:
Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ config :exq,
max_retries: 0,
stats_flush_interval: 5,
stats_batch_size: 1,
middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager]
middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager],
queue_adapter: Exq.Adapters.Queue.Mock
31 changes: 31 additions & 0 deletions lib/exq/adapters/queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Exq.Adapters.Queue do
@moduledoc ~S"""
Behaviour for creating Exq queue adapters
## Example
defmodule Exq.Adapters.Queue.CustomAdapter do
@behaviour Exq.Adapters.Queue
def enqueue(pid, queue, worker, args, options) do
{:ok, apply(worker, :perform, args)}
end
def enqueue_at(pid, queue, time, worker, args, options) do
enqueue_somehow(pid, queue, time, worker, args, options)
end
def enqueue_in(pid, queue, offset, worker, args, options) do
enqueue_in_somehow(pid, queue, offset, worker, args, options)
end
end
"""

@typedoc "The GenServer name"
@type name :: atom | {:global, term} | {:via, module, term}

@typedoc "The server reference"
@type server :: pid | name | {atom, node}

@callback enqueue(server, String.t(), module(), list(), list()) :: tuple()
@callback enqueue_at(server, String.t(), DateTime.t(), module(), list(), list()) :: tuple()
@callback enqueue_in(server, String.t(), integer(), module(), list(), list()) :: tuple()
end
13 changes: 13 additions & 0 deletions lib/exq/adapters/queue/mock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Exq.Adapters.Queue.Mock do
@moduledoc """
Mock queue. Designed to be used when testing your application.
"""

@behaviour Exq.Adapters.Queue

defdelegate enqueue(pid, queue, worker, args, options), to: Exq.Mock

defdelegate enqueue_at(pid, queue, time, worker, args, options), to: Exq.Mock

defdelegate enqueue_in(pid, queue, offset, worker, args, options), to: Exq.Mock
end
26 changes: 26 additions & 0 deletions lib/exq/adapters/queue/redis.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Exq.Adapters.Queue.Redis do
@moduledoc """
Redis based Asynchronous queue. Enqueue the job by using the GenServer API.
Default queue. Designed to be used in production.
"""
alias Exq.Support.Config
alias Exq.Redis.JobQueue

@behaviour Exq.Adapters.Queue

def enqueue(pid, queue, worker, args, options) do
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue(redis, namespace, queue, worker, args, options)
end

def enqueue_at(pid, queue, time, worker, args, options) do
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue_at(redis, namespace, queue, time, worker, args, options)
end

def enqueue_in(pid, queue, offset, worker, args, options) do
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue_in(redis, namespace, queue, offset, worker, args, options)
end
end
13 changes: 6 additions & 7 deletions lib/exq/enqueue_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule Exq.Enqueuer.EnqueueApi do
defmacro __using__(_) do
quote location: :keep do
alias Exq.Support.Config
alias Exq.Redis.JobQueue

@default_options []
@doc """
Expand All @@ -30,8 +29,8 @@ defmodule Exq.Enqueuer.EnqueueApi do
do: enqueue(pid, queue, worker, args, @default_options)

def enqueue(pid, queue, worker, args, options) do
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue(redis, namespace, queue, worker, args, options)
queue_adapter = Config.get(:queue_adapter)
queue_adapter.enqueue(pid, queue, worker, args, options)
end

@doc """
Expand All @@ -50,8 +49,8 @@ defmodule Exq.Enqueuer.EnqueueApi do
do: enqueue_at(pid, queue, time, worker, args, @default_options)

def enqueue_at(pid, queue, time, worker, args, options) do
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue_at(redis, namespace, queue, time, worker, args, options)
queue_adapter = Config.get(:queue_adapter)
queue_adapter.enqueue_at(pid, queue, time, worker, args, options)
end

@doc """
Expand All @@ -70,8 +69,8 @@ defmodule Exq.Enqueuer.EnqueueApi do
do: enqueue_in(pid, queue, offset, worker, args, @default_options)

def enqueue_in(pid, queue, offset, worker, args, options) do
{redis, namespace} = GenServer.call(pid, :redis, Config.get(:genserver_timeout))
JobQueue.enqueue_in(redis, namespace, queue, offset, worker, args, options)
queue_adapter = Config.get(:queue_adapter)
queue_adapter.enqueue_in(pid, queue, offset, worker, args, options)
end
end
end
Expand Down
175 changes: 175 additions & 0 deletions lib/exq/mock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
defmodule Exq.Mock do
alias Exq.Support.Config
alias Exq.Adapters.Queue.Redis
alias Exq.Support.Job
use GenServer
@timeout 30000

defmodule State do
@moduledoc false
defstruct default_mode: :redis, jobs: %{}, modes: %{}
end

### Public api

@doc """
Start Mock server
* `mode` - The default mode that's used for all tests. See `set_mode/1` for details.
"""
def start_link(options \\ []) do
queue_adapter = Config.get(:queue_adapter)

if queue_adapter != Exq.Adapters.Queue.Mock do
raise RuntimeError, """
Exq.Mock can only work if queue_adapter is set to Exq.Adapters.Queue.Mock
Add the following to your test config
config :exq, queue_adapter: Exq.Adapters.Queue.Mock
"""
end

GenServer.start_link(__MODULE__, options, name: __MODULE__)
end

@doc """
Set the mode for current test
* `:redis` - jobs get enqueued and processed via redis.
* `:fake` - jobs get enqueued in a local queue
* `:inline` - jobs get executed in the same process
"""
def set_mode(mode) when mode in [:redis, :inline, :fake] do
GenServer.call(__MODULE__, {:mode, self(), mode}, @timeout)
end

@doc """
List of enqueued jobs
This only works if the mode is set to `:fake`
"""
def jobs do
GenServer.call(__MODULE__, {:jobs, self()}, @timeout)
end

### Private

@impl true
def init(options) do
{:ok, %State{default_mode: Keyword.get(options, :mode, :redis)}}
end

@doc false
def enqueue(pid, queue, worker, args, options) do
{:ok, runnable} =
GenServer.call(
__MODULE__,
{:enqueue, self(), :enqueue, [pid, queue, worker, args, options]},
@timeout
)

runnable.()
end

@doc false
def enqueue_at(pid, queue, time, worker, args, options) do
{:ok, runnable} =
GenServer.call(
__MODULE__,
{:enqueue, self(), :enqueue_at, [pid, queue, time, worker, args, options]},
@timeout
)

runnable.()
end

@doc false
def enqueue_in(pid, queue, offset, worker, args, options) do
{:ok, runnable} =
GenServer.call(
__MODULE__,
{:enqueue, self(), :enqueue_in, [pid, queue, offset, worker, args, options]},
@timeout
)

runnable.()
end

@impl true
def handle_call({:enqueue, owner_pid, type, args}, _from, state) do
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)

case state.modes[owner_pid] do
:redis ->
runnable = fn -> apply(Redis, type, args) end
{:reply, {:ok, runnable}, state}

:inline ->
runnable = fn ->
job = to_job(args)
apply(job.class, :perform, job.args)
{:ok, job.jid}
end

{:reply, {:ok, runnable}, state}

:fake ->
job = to_job(args)
state = update_in(state.jobs[owner_pid], &((&1 || []) ++ [job]))

runnable = fn ->
{:ok, job.jid}
end

{:reply, {:ok, runnable}, state}
end
end

def handle_call({:mode, owner_pid, mode}, _from, state) do
state = maybe_add_and_monitor_pid(state, owner_pid, mode)
{:reply, :ok, state}
end

def handle_call({:jobs, owner_pid}, _from, state) do
jobs = state.jobs[owner_pid] || []
{:reply, jobs, state}
end

@impl true
def handle_info({:DOWN, _, _, pid, _}, state) do
{_, state} = pop_in(state.modes[pid])
{_, state} = pop_in(state.jobs[pid])
{:noreply, state}
end

defp to_job([_pid, queue, worker, args, _options]) do
%Job{
jid: UUID.uuid4(),
queue: queue,
class: worker,
args: args,
enqueued_at: DateTime.utc_now()
}
end

defp to_job([_pid, queue, _time_or_offset, worker, args, _options]) do
%Job{
jid: UUID.uuid4(),
queue: queue,
class: worker,
args: args,
enqueued_at: DateTime.utc_now()
}
end

defp maybe_add_and_monitor_pid(state, pid, mode) do
case state.modes do
%{^pid => _mode} ->
state

_ ->
Process.monitor(pid)
state = put_in(state.modes[pid], mode)
state
end
end
end
3 changes: 2 additions & 1 deletion lib/exq/support/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ defmodule Exq.Support.Config do
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Logger
]
],
queue_adapter: Exq.Adapters.Queue.Redis
}

def get(key) do
Expand Down
40 changes: 40 additions & 0 deletions test/fake_mode_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule FakeModeTest do
use ExUnit.Case, async: true

defmodule BrokenWorker do
def perform(_) do
raise RuntimeError, "Unexpected"
end
end

setup do
Exq.Mock.set_mode(:fake)
end

describe "fake mode" do
test "enqueue" do
assert [] = Exq.Mock.jobs()
assert {:ok, _} = Exq.enqueue(Exq, "low", BrokenWorker, [1])
assert {:ok, _} = Exq.enqueue_at(Exq, "low", DateTime.utc_now(), BrokenWorker, [2])
assert {:ok, _} = Exq.enqueue_in(Exq, "low", 300, BrokenWorker, [3])

assert [
%Exq.Support.Job{
args: [1],
class: FakeModeTest.BrokenWorker,
queue: "low"
},
%Exq.Support.Job{
args: [2],
class: FakeModeTest.BrokenWorker,
queue: "low"
},
%Exq.Support.Job{
args: [3],
class: FakeModeTest.BrokenWorker,
queue: "low"
}
] = Exq.Mock.jobs()
end
end
end
Loading

0 comments on commit 78628b7

Please sign in to comment.