Spin off imports into n oban jobs

This commit is contained in:
FloatingGhost 2022-11-27 21:45:41 +00:00
parent a8f3cf6563
commit ee7059c9cf
6 changed files with 74 additions and 79 deletions

View file

@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- MastoAPI: Accept BooleanLike input on `/api/v1/accounts/:id/follow` (fixes follows with mastodon.py) - MastoAPI: Accept BooleanLike input on `/api/v1/accounts/:id/follow` (fixes follows with mastodon.py)
- Relays from akkoma are now off by default - Relays from akkoma are now off by default
- NormalizeMarkup MRF is now on by default - NormalizeMarkup MRF is now on by default
- Follow/Block/Mute imports now spin off into *n* tasks to avoid the oban timeout
## 2022.11 ## 2022.11

View file

@ -12,47 +12,32 @@ defmodule Pleroma.User.Import do
require Logger require Logger
@spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()} @spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()}
def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do def perform(:mutes_import, %User{} = user, identifier) do
Enum.map( with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier),
identifiers, {:ok, _} <- User.mute(user, muted_user) do
fn identifier -> muted_user
with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier), else
{:ok, _} <- User.mute(user, muted_user) do error -> handle_error(:mutes_import, identifier, error)
muted_user end
else
error -> handle_error(:mutes_import, identifier, error)
end
end
)
end end
def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do def perform(:blocks_import, %User{} = blocker, identifier) do
Enum.map( with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier),
identifiers, {:ok, _block} <- CommonAPI.block(blocker, blocked) do
fn identifier -> blocked
with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier), else
{:ok, _block} <- CommonAPI.block(blocker, blocked) do error -> handle_error(:blocks_import, identifier, error)
blocked end
else
error -> handle_error(:blocks_import, identifier, error)
end
end
)
end end
def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do def perform(:follow_import, %User{} = follower, identifier) do
Enum.map( with {:ok, %User{} = followed} <- User.get_or_fetch(identifier),
identifiers, {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed),
fn identifier -> {:ok, _, _, _} <- CommonAPI.follow(follower, followed) do
with {:ok, %User{} = followed} <- User.get_or_fetch(identifier), followed
{:ok, follower, followed} <- User.maybe_direct_follow(follower, followed), else
{:ok, _, _, _} <- CommonAPI.follow(follower, followed) do error -> handle_error(:follow_import, identifier, error)
followed end
else
error -> handle_error(:follow_import, identifier, error)
end
end
)
end end
def perform(_, _, _), do: :ok def perform(_, _, _), do: :ok
@ -62,24 +47,24 @@ defmodule Pleroma.User.Import do
error error
end end
def blocks_import(%User{} = blocker, [_ | _] = identifiers) do defp enqueue_many(op, user, identifiers) do
BackgroundWorker.enqueue( Enum.map(
"blocks_import", identifiers,
%{"user_id" => blocker.id, "identifiers" => identifiers} fn identifier ->
BackgroundWorker.enqueue(op, %{"user_id" => user.id, "identifier" => identifier})
end
) )
end end
def blocks_import(%User{} = blocker, [_ | _] = identifiers) do
enqueue_many("blocks_import", blocker, identifiers)
end
def follow_import(%User{} = follower, [_ | _] = identifiers) do def follow_import(%User{} = follower, [_ | _] = identifiers) do
BackgroundWorker.enqueue( enqueue_many("follow_import", follower, identifiers)
"follow_import",
%{"user_id" => follower.id, "identifiers" => identifiers}
)
end end
def mutes_import(%User{} = user, [_ | _] = identifiers) do def mutes_import(%User{} = user, [_ | _] = identifiers) do
BackgroundWorker.enqueue( enqueue_many("mutes_import", user, identifiers)
"mutes_import",
%{"user_id" => user.id, "identifiers" => identifiers}
)
end end
end end

View file

@ -25,10 +25,10 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:force_password_reset, user) User.perform(:force_password_reset, user)
end end
def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}}) def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifier" => identifier}})
when op in ["blocks_import", "follow_import", "mutes_import"] do when op in ["blocks_import", "follow_import", "mutes_import"] do
user = User.get_cached_by_id(user_id) user = User.get_cached_by_id(user_id)
{:ok, User.Import.perform(String.to_atom(op), user, identifiers)} {:ok, User.Import.perform(String.to_atom(op), user, identifier)}
end end
def perform(%Job{ def perform(%Job{

View file

@ -11,7 +11,7 @@ defmodule Pleroma.Emoji.FormatterTest do
text = "I love :firefox:" text = "I love :firefox:"
expected_result = expected_result =
"I love <img class=\"emoji\" alt=\"firefox\" title=\"firefox\" src=\"/emoji/Firefox.gif\"/>" "I love <img alt=\"firefox\" title=\"firefox\" src=\"/emoji/Firefox.gif\"/>"
assert Formatter.emojify(text) == expected_result assert Formatter.emojify(text) == expected_result
end end

View file

@ -25,11 +25,14 @@ defmodule Pleroma.User.ImportTest do
user3.nickname user3.nickname
] ]
{:ok, job} = User.Import.follow_import(user1, identifiers) [{:ok, job1}, {:ok, job2}] = User.Import.follow_import(user1, identifiers)
assert {:ok, result} = ObanHelpers.perform(job1)
assert result == refresh_record(user2)
assert {:ok, result} = ObanHelpers.perform(job2)
assert result == refresh_record(user3)
assert {:ok, result} = ObanHelpers.perform(job)
assert is_list(result)
assert result == [refresh_record(user2), refresh_record(user3)]
assert User.following?(user1, user2) assert User.following?(user1, user2)
assert User.following?(user1, user3) assert User.following?(user1, user3)
end end
@ -44,11 +47,14 @@ defmodule Pleroma.User.ImportTest do
user3.nickname user3.nickname
] ]
{:ok, job} = User.Import.blocks_import(user1, identifiers) [{:ok, job1}, {:ok, job2}] = User.Import.blocks_import(user1, identifiers)
assert {:ok, result} = ObanHelpers.perform(job1)
assert result == user2
assert {:ok, result} = ObanHelpers.perform(job2)
assert result == user3
assert {:ok, result} = ObanHelpers.perform(job)
assert is_list(result)
assert result == [user2, user3]
assert User.blocks?(user1, user2) assert User.blocks?(user1, user2)
assert User.blocks?(user1, user3) assert User.blocks?(user1, user3)
end end
@ -63,11 +69,14 @@ defmodule Pleroma.User.ImportTest do
user3.nickname user3.nickname
] ]
{:ok, job} = User.Import.mutes_import(user1, identifiers) [{:ok, job1}, {:ok, job2}] = User.Import.mutes_import(user1, identifiers)
assert {:ok, result} = ObanHelpers.perform(job1)
assert result == user2
assert {:ok, result} = ObanHelpers.perform(job2)
assert result == user3
assert {:ok, result} = ObanHelpers.perform(job)
assert is_list(result)
assert result == [user2, user3]
assert User.mutes?(user1, user2) assert User.mutes?(user1, user2)
assert User.mutes?(user1, user3) assert User.mutes?(user1, user3)
end end

View file

@ -47,8 +47,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() assert [{:ok, job_result}] = ObanHelpers.perform_all()
assert job_result == [refresh_record(user2)] assert job_result == refresh_record(user2)
assert [%Pleroma.User{follower_count: 1}] = job_result assert %Pleroma.User{follower_count: 1} = job_result
end end
end end
@ -108,8 +108,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
|> post("/api/pleroma/follow_import", %{"list" => identifiers}) |> post("/api/pleroma/follow_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() job_results = Enum.map(ObanHelpers.perform_all(), fn {:ok, result} -> result end)
assert job_result == Enum.map(users, &refresh_record/1) assert job_results == Enum.map(users, &refresh_record/1)
end end
end end
@ -141,8 +141,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
}) })
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() job_results = Enum.map(ObanHelpers.perform_all(), fn {:ok, result} -> result end)
assert job_result == users assert job_results == users
end end
end end
@ -165,8 +165,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
|> post("/api/pleroma/blocks_import", %{"list" => identifiers}) |> post("/api/pleroma/blocks_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() job_results = Enum.map(ObanHelpers.perform_all(), fn {:ok, result} -> result end)
assert job_result == users assert job_results == users
end end
end end
@ -183,12 +183,12 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
|> post("/api/pleroma/mutes_import", %{"list" => "#{user2.ap_id}"}) |> post("/api/pleroma/mutes_import", %{"list" => "#{user2.ap_id}"})
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() job_results = Enum.map(ObanHelpers.perform_all(), fn {:ok, result} -> result end)
assert job_result == [user2] assert job_results == [user2]
assert Pleroma.User.mutes?(user, user2) assert Pleroma.User.mutes?(user, user2)
end end
test "it imports mutes users from file", %{user: user, conn: conn} do test "it imports muted users from file", %{user: user, conn: conn} do
users = [user2, user3] = insert_list(2, :user) users = [user2, user3] = insert_list(2, :user)
with_mocks([ with_mocks([
@ -202,8 +202,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
}) })
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() job_results = Enum.map(ObanHelpers.perform_all(), fn {:ok, result} -> result end)
assert job_result == users assert job_results == users
assert Enum.all?(users, &Pleroma.User.mutes?(user, &1)) assert Enum.all?(users, &Pleroma.User.mutes?(user, &1))
end end
end end
@ -227,8 +227,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
|> post("/api/pleroma/mutes_import", %{"list" => identifiers}) |> post("/api/pleroma/mutes_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200) |> json_response_and_validate_schema(200)
assert [{:ok, job_result}] = ObanHelpers.perform_all() job_results = Enum.map(ObanHelpers.perform_all(), fn {:ok, result} -> result end)
assert job_result == users assert job_results == users
assert Enum.all?(users, &Pleroma.User.mutes?(user, &1)) assert Enum.all?(users, &Pleroma.User.mutes?(user, &1))
end end
end end