Replace Pleroma.Jobs with pleroma_job_queue

This commit is contained in:
Egor 2019-03-29 12:46:05 +00:00 committed by lambda
parent 749d53e2b2
commit 9a39d1d846
11 changed files with 24 additions and 283 deletions

View file

@ -348,10 +348,10 @@ config :pleroma, Pleroma.Web.Federator.RetryQueue,
initial_timeout: 30, initial_timeout: 30,
max_retries: 5 max_retries: 5
config :pleroma, Pleroma.Jobs, config :pleroma_job_queue, :queues,
federator_incoming: [max_jobs: 50], federator_incoming: 50,
federator_outgoing: [max_jobs: 50], federator_outgoing: 50,
mailer: [max_jobs: 10] mailer: 10
config :pleroma, :fetch_initial_posts, config :pleroma, :fetch_initial_posts,
enabled: false, enabled: false,

View file

@ -48,7 +48,7 @@ config :web_push_encryption, :vapid_details,
config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
config :pleroma, Pleroma.Jobs, testing: [max_jobs: 2] config :pleroma_job_queue, disabled: true
try do try do
import_config "test.secret.exs" import_config "test.secret.exs"

View file

@ -253,25 +253,20 @@ You can then do
curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
``` ```
## Pleroma.Jobs ## :pleroma_job_queue
A list of job queues and their settings. [Pleroma Job Queue][https://git.pleroma.social/pleroma/pleroma_job_queue] configuration: a list of queues with maximum concurrent jobs.
Job queue settings:
* `max_jobs`: The maximum amount of parallel jobs running at the same time.
Example: Example:
```exs ```elixir
config :pleroma, Pleroma.Jobs, config :pleroma_job_queue, :queues,
federator_incoming: [max_jobs: 50], federator_incoming: 50,
federator_outgoing: [max_jobs: 50] federator_outgoing: 50
``` ```
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`.
## Pleroma.Web.Federator.RetryQueue ## Pleroma.Web.Federator.RetryQueue
* `enabled`: If set to `true`, failed federation jobs will be retried * `enabled`: If set to `true`, failed federation jobs will be retried

View file

@ -110,7 +110,6 @@ defmodule Pleroma.Application do
worker(Pleroma.Web.Federator.RetryQueue, []), worker(Pleroma.Web.Federator.RetryQueue, []),
worker(Pleroma.Stats, []), worker(Pleroma.Stats, []),
worker(Pleroma.Web.Push, []), worker(Pleroma.Web.Push, []),
worker(Pleroma.Jobs, []),
worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary) worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary)
] ++ ] ++
streamer_child() ++ streamer_child() ++

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Mailer do
use Swoosh.Mailer, otp_app: :pleroma use Swoosh.Mailer, otp_app: :pleroma
def deliver_async(email, config \\ []) do def deliver_async(email, config \\ []) do
Pleroma.Jobs.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
end end
def perform(:deliver_async, email, config), do: deliver(email, config) def perform(:deliver_async, email, config), do: deliver(email, config)

View file

@ -1,152 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Jobs do
@moduledoc """
A basic job queue
"""
use GenServer
require Logger
def init(args) do
{:ok, args}
end
def start_link do
queues =
Pleroma.Config.get(Pleroma.Jobs)
|> Enum.map(fn {name, _} -> create_queue(name) end)
|> Enum.into(%{})
state = %{
queues: queues,
refs: %{}
}
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def create_queue(name) do
{name, {:sets.new(), []}}
end
@doc """
Enqueues a job.
Returns `:ok`.
## Arguments
- `queue_name` - a queue name(must be specified in the config).
- `mod` - a worker module (must have `perform` function).
- `args` - a list of arguments for the `perform` function of the worker module.
- `priority` - a job priority (`0` by default).
## Examples
Enqueue `Module.perform/0` with `priority=1`:
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
:ok
Enqueue `Module.perform(:job_name)` with `priority=5`:
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
:ok
Enqueue `Module.perform(:another_job, data)` with `priority=1`:
iex> data = "foobar"
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
:ok
Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
:ok
"""
def enqueue(queue_name, mod, args, priority \\ 1)
if Mix.env() == :test do
def enqueue(_queue_name, mod, args, _priority) do
apply(mod, :perform, args)
end
else
@spec enqueue(atom(), atom(), [any()], integer()) :: :ok
def enqueue(queue_name, mod, args, priority) do
GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
end
end
def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
{running_jobs, queue} = state[:queues][queue_name]
queue = enqueue_sorted(queue, {mod, args}, priority)
state =
state
|> update_queue(queue_name, {running_jobs, queue})
|> maybe_start_job(queue_name, running_jobs, queue)
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
queue_name = state.refs[ref]
{running_jobs, queue} = state[:queues][queue_name]
running_jobs = :sets.del_element(ref, running_jobs)
state =
state
|> remove_ref(ref)
|> update_queue(queue_name, {running_jobs, queue})
|> maybe_start_job(queue_name, running_jobs, queue)
{:noreply, state}
end
def maybe_start_job(state, queue_name, running_jobs, queue) do
if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
queue != [] do
{{mod, args}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
mref = Process.monitor(pid)
state
|> add_ref(queue_name, mref)
|> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
else
state
end
end
def enqueue_sorted(queue, element, priority) do
[%{item: element, priority: priority} | queue]
|> Enum.sort_by(fn %{priority: priority} -> priority end)
end
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
defp add_ref(state, queue_name, ref) do
refs = Map.put(state[:refs], ref, queue_name)
Map.put(state, :refs, refs)
end
defp remove_ref(state, ref) do
refs = Map.delete(state[:refs], ref)
Map.put(state, :refs, refs)
end
defp update_queue(state, queue_name, data) do
queues = Map.put(state[:queues], queue_name, data)
Map.put(state, :queues, queues)
end
end

View file

@ -4,7 +4,6 @@
defmodule Pleroma.Web.Federator do defmodule Pleroma.Web.Federator do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Jobs
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Relay
@ -31,39 +30,39 @@ defmodule Pleroma.Web.Federator do
# Client API # Client API
def incoming_doc(doc) do def incoming_doc(doc) do
Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
end end
def incoming_ap_doc(params) do def incoming_ap_doc(params) do
Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
end end
def publish(activity, priority \\ 1) do def publish(activity, priority \\ 1) do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
end end
def publish_single_ap(params) do def publish_single_ap(params) do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
end end
def publish_single_websub(websub) do def publish_single_websub(websub) do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub]) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
end end
def verify_websub(websub) do def verify_websub(websub) do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
end end
def request_subscription(sub) do def request_subscription(sub) do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
end end
def refresh_subscriptions do def refresh_subscriptions do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
end end
def publish_single_salmon(params) do def publish_single_salmon(params) do
Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
end end
# Job Worker Callbacks # Job Worker Callbacks

View file

@ -93,7 +93,8 @@ defmodule Pleroma.Mixfile do
{:timex, "~> 3.5"}, {:timex, "~> 3.5"},
{:auto_linker, {:auto_linker,
git: "https://git.pleroma.social/pleroma/auto_linker.git", git: "https://git.pleroma.social/pleroma/auto_linker.git",
ref: "94193ca5f97c1f9fdf3d1469653e2d46fac34bcd"} ref: "94193ca5f97c1f9fdf3d1469653e2d46fac34bcd"},
{:pleroma_job_queue, "~> 0.2.0"}
] ]
end end

View file

@ -50,6 +50,7 @@
"phoenix_ecto": {:hex, :phoenix_ecto, "4.0.0", "c43117a136e7399ea04ecaac73f8f23ee0ffe3e07acfcb8062fe5f4c9f0f6531", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.9", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.0.0", "c43117a136e7399ea04ecaac73f8f23ee0ffe3e07acfcb8062fe5f4c9f0f6531", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.9", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.1", "6668d787e602981f24f17a5fbb69cc98f8ab085114ebfac6cc36e10a90c8e93c", [:mix], [], "hexpm"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.1", "6668d787e602981f24f17a5fbb69cc98f8ab085114ebfac6cc36e10a90c8e93c", [:mix], [], "hexpm"},
"pleroma_job_queue": {:hex, :pleroma_job_queue, "0.2.0", "879e660aa1cebe8dc6f0aaaa6aa48b4875e89cd961d4a585fd128e0773b31a18", [:mix], [], "hexpm"},
"plug": {:hex, :plug, "1.7.2", "d7b7db7fbd755e8283b6c0a50be71ec0a3d67d9213d74422d9372effc8e87fd1", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}], "hexpm"}, "plug": {:hex, :plug, "1.7.2", "d7b7db7fbd755e8283b6c0a50be71ec0a3d67d9213d74422d9372effc8e87fd1", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}], "hexpm"},
"plug_cowboy": {:hex, :plug_cowboy, "2.0.1", "d798f8ee5acc86b7d42dbe4450b8b0dadf665ce588236eb0a751a132417a980e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "plug_cowboy": {:hex, :plug_cowboy, "2.0.1", "d798f8ee5acc86b7d42dbe4450b8b0dadf665ce588236eb0a751a132417a980e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},

View file

@ -1,83 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.JobsTest do
use ExUnit.Case, async: true
alias Jobs.WorkerMock
alias Pleroma.Jobs
setup do
state = %{
queues: Enum.into([Jobs.create_queue(:testing)], %{}),
refs: %{}
}
[state: state]
end
test "creates queue" do
queue = Jobs.create_queue(:foobar)
assert {:foobar, set} = queue
assert :set == elem(set, 0) |> elem(0)
end
test "enqueues an element according to priority" do
queue = [%{item: 1, priority: 2}]
new_queue = Jobs.enqueue_sorted(queue, 2, 1)
assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
new_queue = Jobs.enqueue_sorted(queue, 2, 3)
assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}]
end
test "pop first item" do
queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}]
assert {2, [%{item: 1, priority: 2}]} = Jobs.queue_pop(queue)
end
test "enqueue a job", %{state: state} do
assert {:noreply, new_state} =
Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state
assert :sets.size(running_jobs) == 1
assert [ref] = :sets.to_list(running_jobs)
assert %{refs: %{^ref => :testing}} = new_state
end
test "max jobs setting", %{state: state} do
max_jobs = Pleroma.Config.get([Jobs, :testing, :max_jobs])
{:noreply, state} =
Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} ->
Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
end)
assert %{
queues: %{
testing:
{running_jobs, [%{item: {WorkerMock, [:test_job, :foo, :bar]}, priority: 3}]}
}
} = state
assert :sets.size(running_jobs) == max_jobs
end
test "remove job after it finished", %{state: state} do
{:noreply, new_state} =
Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state)
%{queues: %{testing: {running_jobs, []}}} = new_state
[ref] = :sets.to_list(running_jobs)
assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} =
Jobs.handle_info({:DOWN, ref, :process, nil, nil}, new_state)
assert :sets.size(running_jobs) == 0
end
end

View file

@ -1,19 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Jobs.WorkerMock do
require Logger
def perform(:test_job, arg, arg2) do
Logger.debug({:perform, :test_job, arg, arg2})
end
def perform(:test_job, payload) do
Logger.debug({:perform, :test_job, payload})
end
def test_job(payload) do
Pleroma.Jobs.enqueue(:testing, __MODULE__, [:test_job, payload])
end
end