diff --git a/.envrc b/.envrc new file mode 100644 index 000000000..3550a30f2 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore index f9de4ed49..dc3db2257 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.direnv/ # App artifacts docs/site *.sw* diff --git a/flake.lock b/flake.lock new file mode 100644 index 000000000..22ac3721f --- /dev/null +++ b/flake.lock @@ -0,0 +1,42 @@ +{ + "nodes": { + "flake-utils": { + "locked": { + "lastModified": 1667395993, + "narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1667573297, + "narHash": "sha256-nPPcRXXqovzJZZQtVJGujMAF+LGNoTp+Q/z5drq+rso=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "dac8adf99ace8480b759dd24a16c9aad2507e6cb", + "type": "github" + }, + "original": { + "owner": "NixOS", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 000000000..56ad90a4a --- /dev/null +++ b/flake.nix @@ -0,0 +1,29 @@ +{ + description = "Akkoma dev flake"; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { + self, + nixpkgs, + flake-utils, + }: + flake-utils.lib.eachDefaultSystem (system: let + pkgs = import nixpkgs {inherit system;}; + in { + formatter = pkgs.alejandra; + devShells.default = pkgs.mkShell { + buildInputs = with pkgs; [ + file + ]; + nativeBuildInputs = with pkgs; [ + elixir + cmake + libxcrypt + ]; + }; + }); +} diff --git a/lib/mix/migrator.ex b/lib/mix/migrator.ex new file mode 100644 index 000000000..648b0166d --- /dev/null +++ b/lib/mix/migrator.ex @@ -0,0 +1,116 @@ +defmodule Mix.Pleroma.Migrator do + import Mix.Pleroma + alias Pleroma.Activity + alias Pleroma.Object + alias Pleroma.Repo + alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.ActivityPub.ActivityPub + + @doc "Common functions to be reused in migrator tasks" + def keys_to_atoms(map) do + Map.new(map, fn {k, v} -> {String.to_atom(k), v} end) + end + + def loop_fields(map, fields, fun) do + Enum.reduce(fields, map, fn (key, acc) -> + if Map.has_key?(acc, key) do + Map.put(acc, key, fun.(acc[key])) + else + acc + end + end) + end + + def parse_timestamp_usec(nil), do: nil + def parse_timestamp_usec(""), do: nil + + def parse_timestamp_usec(timestamp) do + case NaiveDateTime.from_iso8601(timestamp) do + {:ok, dt} -> dt + {:error, reason} -> IO.puts "Error: #{reason}" + end + end + + def parse_timestamp(nil), do: nil + def parse_timestamp(""), do: nil + + def parse_timestamp(timestamp) do + parse_timestamp_usec(timestamp) + |> NaiveDateTime.truncate(:second) + end + + def parse_timestamp_utc(nil), do: nil + def parse_timestamp_utc(""), do: nil + + def parse_timestamp_utc(timestamp) do + with {:ok, dt} <- NaiveDateTime.from_iso8601(timestamp), + {:ok, dt} <- DateTime.from_naive(dt, "Etc/UTC"), + dt <- DateTime.truncate(dt, :second) do + dt + else + _ -> nil + end + end + + def parse_id_list(id_list) do + Enum.map(id_list, fn id -> + id + |> FlakeId.from_integer + |> FlakeId.to_string + end) + end + + def truncate(str, max_length) do + String.slice(str, 0, max_length) + end + + def try_create_activity(params) do + {:ok, object} = try_create_object(params["object"]) + if object do + try do + {:ok, _activity, _meta} = ActivityPub.persist(params, local: false) + rescue + Ecto.ConstraintError -> + shell_info("Activity already in database, skipping") + FunctionClauseError -> + shell_info("Unknown error occurred, skipping") + end + end + end + + defp try_create_object(params) do + object_data = params + # |> Transmogrifier.strip_internal_fields # We need internal fields for `likes` and `like_count`, etc + # |> Transmogrifier.fix_actor # Makes network requests + |> Transmogrifier.fix_url + |> Transmogrifier.fix_attachments + |> Transmogrifier.fix_context + # |> Transmogrifier.fix_in_reply_to # Makes network requests + |> Transmogrifier.fix_emoji + |> Transmogrifier.fix_tag + |> Transmogrifier.fix_content_map + # |> Transmogrifier.fix_addressing # Makes network requests + |> Transmogrifier.fix_summary + # |> Transmogrifier.fix_type + + + object_params = %{ + data: object_data, + inserted_at: params[:inserted_at], + updated_at: params[:updated_at] + } + + try do + {:ok, oobject} = Repo.insert(struct(Object, object_params)) + shell_info("Object created") + {:ok, oobject} + rescue + Ecto.ConstraintError -> + shell_info("Object already in database, skipping") + {:ok, nil} + FunctionClauseError -> + shell_info("Unknown error occurred, skipping") + {:ok, nil} + end + end +end diff --git a/lib/mix/tasks/migrator/fix.ex b/lib/mix/tasks/migrator/fix.ex new file mode 100644 index 000000000..f5070b77d --- /dev/null +++ b/lib/mix/tasks/migrator/fix.ex @@ -0,0 +1,43 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Fix do + use Mix.Task + alias Pleroma.Object + alias Pleroma.Repo + + require Logger + import Ecto.Changeset + import Ecto.Query + import Mix.Pleroma + + @shortdoc "Fix stuff from old migrations. Run at your own risk." + + # Apparently media can get migrated in reverse order. + # This reverses the order of media attachments in all migrated objects. + # https://gitlab.com/soapbox-pub/migrator/-/issues/27 + def run(["reverse_media"]) do + start_pleroma() + + stream = + Object + |> where([o], fragment("?->'pleroma_internal'->>'migrator'='true'", o.data)) + |> where([o], fragment("json_array_length((?->'attachment')::json) > 1", o.data)) + |> Repo.stream() + + Repo.transaction(fn -> + Enum.each(stream, fn object -> + with %Object{data: %{"id" => id} = data} <- object do + shell_info("Reversing media for #{id}") + + object + |> change(data: do_reverse_media(data)) + |> Repo.update!() + end + end) + end, timeout: :infinity) + end + + defp do_reverse_media(%{"attachment" => attachments} = data) when is_list(attachments) do + Map.put(data, "attachment", Enum.reverse(attachments)) + end + + defp do_reverse_media(data), do: data +end diff --git a/lib/mix/tasks/migrator/hello.ex b/lib/mix/tasks/migrator/hello.ex new file mode 100644 index 000000000..201014aaa --- /dev/null +++ b/lib/mix/tasks/migrator/hello.ex @@ -0,0 +1,8 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Hello do + use Mix.Task + + @shortdoc "Test that Pleroma can run this task." + def run(_) do + IO.puts("Hello, World!") + end +end diff --git a/lib/mix/tasks/migrator/import.ex b/lib/mix/tasks/migrator/import.ex new file mode 100644 index 000000000..82d7e78e4 --- /dev/null +++ b/lib/mix/tasks/migrator/import.ex @@ -0,0 +1,20 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import do + use Mix.Task + alias Mix.Tasks.Pleroma.Migrator + + @shortdoc "Import all dumps." + def run(_) do + Migrator.Import.Users.run(nil) + Migrator.Import.Follows.run(nil) + Migrator.Import.Blocks.run(nil) + Migrator.Import.Mutes.run(nil) + Migrator.Import.Lists.run(nil) + Migrator.Import.Filters.run(nil) + Migrator.Import.Apps.run(nil) + Migrator.Import.Tokens.run(nil) + Migrator.Import.Votes.run(nil) + Migrator.Import.ThreadMutes.run(nil) + Migrator.Import.Likes.run(nil) + Migrator.Import.Statuses.run(nil) + end +end diff --git a/lib/mix/tasks/migrator/import/apps.ex b/lib/mix/tasks/migrator/import/apps.ex new file mode 100644 index 000000000..956d8af73 --- /dev/null +++ b/lib/mix/tasks/migrator/import/apps.ex @@ -0,0 +1,31 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Apps do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.Repo + alias Pleroma.Web.OAuth.App + + @shortdoc "Import OAuth apps." + def run(_) do + start_pleroma() + File.stream!("migrator/apps.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + Jason.decode!(line) + |> keys_to_atoms() + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + |> try_create_app() + end + + defp try_create_app(params) do + changeset = struct(App, params) + + with {:ok, app} <- Repo.insert(changeset) do + shell_info("App #{app.client_name} created") + else + _ -> shell_info("Could not create app") + end + end +end diff --git a/lib/mix/tasks/migrator/import/blocks.ex b/lib/mix/tasks/migrator/import/blocks.ex new file mode 100644 index 000000000..d0c11d409 --- /dev/null +++ b/lib/mix/tasks/migrator/import/blocks.ex @@ -0,0 +1,36 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Blocks do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.UserRelationship + + @shortdoc "Import blocks." + def run(_) do + start_pleroma() + File.stream!("migrator/blocks.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + + try_create_activity(params) + try_create_block(params) + end + + defp try_create_block(%{data: %{"actor" => actor, "object" => object}} = _params) do + try do + source = User.get_by_ap_id(actor) + target = User.get_by_ap_id(object) + UserRelationship.create_block(source, target) + shell_info("Block created") + rescue + MatchError -> shell_info("Could not create block") + FunctionClauseError -> shell_info("Could not create block") + end + end +end diff --git a/lib/mix/tasks/migrator/import/filters.ex b/lib/mix/tasks/migrator/import/filters.ex new file mode 100644 index 000000000..a50bda632 --- /dev/null +++ b/lib/mix/tasks/migrator/import/filters.ex @@ -0,0 +1,45 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Filters do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.Filter + alias Pleroma.Repo + + @shortdoc "Import filters." + def run(_) do + start_pleroma() + File.stream!("migrator/filters.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + |> loop_fields([:expires_at], &parse_timestamp_utc/1) + |> fix_user_id + + try_create_filter(params) + end + + defp fix_user_id(params) do + creator = User.get_by_ap_id(params[:user_ap_id]) + Map.put(params, :user_id, creator.id) + |> Map.delete(:user_ap_id) + end + + defp try_create_filter(params) do + changeset = struct(Filter, params) + + try do + {:ok, _filter} = Repo.insert(changeset) + shell_info("Filter created") + rescue + Ecto.ConstraintError -> shell_info("Filter already exists, skipping") + MatchError -> shell_info("Could not create filter") + FunctionClauseError -> shell_info("Could not create filter") + end + end +end diff --git a/lib/mix/tasks/migrator/import/follows.ex b/lib/mix/tasks/migrator/import/follows.ex new file mode 100644 index 000000000..62b2cdab5 --- /dev/null +++ b/lib/mix/tasks/migrator/import/follows.ex @@ -0,0 +1,37 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Follows do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.FollowingRelationship + + @shortdoc "Import follows." + def run(_) do + start_pleroma() + File.stream!("migrator/follows.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + + shell_info("Importing follow...") + try_create_activity(params) + create_follow(params) + end + + defp create_follow(%{data: %{"actor" => actor, "object" => object}} = _params) do + try do + follower = User.get_by_ap_id(actor) + following = User.get_by_ap_id(object) + FollowingRelationship.follow(follower, following) + shell_info("Follow relationship created") + rescue + MatchError -> shell_info("Could not create follow") + FunctionClauseError -> shell_info("Could not create follow") + end + end +end diff --git a/lib/mix/tasks/migrator/import/likes.ex b/lib/mix/tasks/migrator/import/likes.ex new file mode 100644 index 000000000..935e05cde --- /dev/null +++ b/lib/mix/tasks/migrator/import/likes.ex @@ -0,0 +1,22 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Likes do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + + @shortdoc "Import likes." + def run(_) do + start_pleroma() + File.stream!("migrator/likes.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + + shell_info("Importing like...") + try_create_activity(params) + end +end diff --git a/lib/mix/tasks/migrator/import/lists.ex b/lib/mix/tasks/migrator/import/lists.ex new file mode 100644 index 000000000..2bbea62ef --- /dev/null +++ b/lib/mix/tasks/migrator/import/lists.ex @@ -0,0 +1,44 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Lists do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.List + alias Pleroma.Repo + + @shortdoc "Import lists." + def run(_) do + start_pleroma() + File.stream!("migrator/lists.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + |> fix_user_id + + try_create_list(params) + end + + defp fix_user_id(params) do + creator = User.get_by_ap_id(params[:user_ap_id]) + Map.put(params, :user_id, creator.id) + |> Map.delete(:user_ap_id) + end + + defp try_create_list(params) do + changeset = struct(List, params) + + try do + {:ok, _list} = Repo.insert(changeset) + shell_info("List created") + rescue + Ecto.ConstraintError -> shell_info("List already exists, skipping") + MatchError -> shell_info("Could not create list") + FunctionClauseError -> shell_info("Could not create list") + end + end +end diff --git a/lib/mix/tasks/migrator/import/mutes.ex b/lib/mix/tasks/migrator/import/mutes.ex new file mode 100644 index 000000000..1b1e439e0 --- /dev/null +++ b/lib/mix/tasks/migrator/import/mutes.ex @@ -0,0 +1,35 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Mutes do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.UserRelationship + + @shortdoc "Import mutes." + def run(_) do + start_pleroma() + File.stream!("migrator/mutes.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + + try_create_mute(params) + end + + defp try_create_mute(%{source_ap_id: source_ap_id, target_ap_id: target_ap_id} = _params) do + try do + source = User.get_by_ap_id(source_ap_id) + target = User.get_by_ap_id(target_ap_id) + UserRelationship.create_mute(source, target) + shell_info("Mute created") + rescue + MatchError -> shell_info("Could not create mute") + FunctionClauseError -> shell_info("Could not create mute") + end + end +end diff --git a/lib/mix/tasks/migrator/import/statuses.ex b/lib/mix/tasks/migrator/import/statuses.ex new file mode 100644 index 000000000..8dbe38d1d --- /dev/null +++ b/lib/mix/tasks/migrator/import/statuses.ex @@ -0,0 +1,21 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Statuses do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + + @shortdoc "Import statuses." + def run(_) do + start_pleroma() + File.stream!("migrator/statuses.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> Map.delete("id") + + shell_info("Importing status...") + try_create_activity(params) + end +end diff --git a/lib/mix/tasks/migrator/import/thread_mutes.ex b/lib/mix/tasks/migrator/import/thread_mutes.ex new file mode 100644 index 000000000..a75e3af84 --- /dev/null +++ b/lib/mix/tasks/migrator/import/thread_mutes.ex @@ -0,0 +1,33 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.ThreadMutes do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.ThreadMute + + @shortdoc "Import thread mutes." + def run(_) do + start_pleroma() + File.stream!("migrator/thread_mutes.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + + try_create_thread_mute(params) + end + + defp try_create_thread_mute(%{ap_id: ap_id, context: context} = _params) do + try do + %User{id: user_id} = User.get_by_ap_id(ap_id) + ThreadMute.add_mute(user_id, context) + shell_info("Thread mute created") + rescue + MatchError -> shell_info("Could not create thread mute") + FunctionClauseError -> shell_info("Could not create thread mute") + end + end +end diff --git a/lib/mix/tasks/migrator/import/tokens.ex b/lib/mix/tasks/migrator/import/tokens.ex new file mode 100644 index 000000000..486d48a62 --- /dev/null +++ b/lib/mix/tasks/migrator/import/tokens.ex @@ -0,0 +1,40 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Tokens do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.Repo + alias Pleroma.User + alias Pleroma.Web.OAuth.App + alias Pleroma.Web.OAuth.Token + + @shortdoc "Import OAuth tokens." + def run(_) do + start_pleroma() + File.stream!("migrator/tokens.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + Jason.decode!(line) + |> keys_to_atoms() + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + |> try_create_token() + end + + defp try_create_token(params) do + changeset = struct(Token, params) + + with %{user_ap_id: user_ap_id} <- params, + %{app_client_id: app_client_id} <- params, + %User{id: user_id} = _user <- User.get_by_ap_id(user_ap_id), + %App{id: app_id} = _app <- Repo.get_by(App, client_id: app_client_id), + changeset <- Map.delete(changeset, :user_ap_id), + changeset <- Map.delete(changeset, :app_client_id), + changeset <- Map.merge(changeset, %{user_id: user_id, app_id: app_id}), + {:ok, token} <- Repo.insert(changeset) do + shell_info("Token #{token.id} created") + else + _ -> shell_info("Could not create token") + end + end +end diff --git a/lib/mix/tasks/migrator/import/users.ex b/lib/mix/tasks/migrator/import/users.ex new file mode 100644 index 000000000..425cbc532 --- /dev/null +++ b/lib/mix/tasks/migrator/import/users.ex @@ -0,0 +1,42 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Users do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + alias Pleroma.User + alias Pleroma.Repo + + @shortdoc "Import users." + def run(_) do + start_pleroma() + File.stream!("migrator/users.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000) + name_limit = Pleroma.Config.get([:instance, :user_name_length], 100) + + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([ + :inserted_at, + :updated_at, + :last_digest_emailed_at], &parse_timestamp/1) + |> loop_fields([:last_refreshed_at], &parse_timestamp_usec/1) + |> loop_fields([:notification_settings], &keys_to_atoms/1) + |> loop_fields([:name], &(truncate(&1, name_limit))) + |> loop_fields([:bio], &(truncate(&1, bio_limit))) + + changeset = struct(User, params) + + try do + {:ok, user} = Repo.insert(changeset) + User.set_cache(user) + shell_info("User #{params.nickname} created") + rescue + Ecto.ConstraintError -> + shell_info("User #{params.nickname} already in database, skipping") + end + end +end diff --git a/lib/mix/tasks/migrator/import/votes.ex b/lib/mix/tasks/migrator/import/votes.ex new file mode 100644 index 000000000..27c1a8c8a --- /dev/null +++ b/lib/mix/tasks/migrator/import/votes.ex @@ -0,0 +1,22 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Import.Votes do + use Mix.Task + import Mix.Pleroma + import Mix.Pleroma.Migrator + + @shortdoc "Import poll votes." + def run(_) do + start_pleroma() + File.stream!("migrator/votes.txt") + |> Enum.each(&handle_line/1) + end + + defp handle_line(line) do + params = + Jason.decode!(line) + |> keys_to_atoms + |> loop_fields([:inserted_at, :updated_at], &parse_timestamp/1) + + shell_info("Importing poll vote...") + try_create_activity(params) + end +end diff --git a/lib/mix/tasks/migrator/rebuild.ex b/lib/mix/tasks/migrator/rebuild.ex new file mode 100644 index 000000000..f3894c8ae --- /dev/null +++ b/lib/mix/tasks/migrator/rebuild.ex @@ -0,0 +1,60 @@ +defmodule Mix.Tasks.Pleroma.Migrator.Rebuild do + use Mix.Task + alias Pleroma.Object + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Transmogrifier + + require Logger + import Ecto.Query + import Mix.Pleroma + + @shortdoc "Rebuild remote data." + + def run(["users"]) do + start_pleroma() + + User + |> where([u], u.local == false) + |> where([u], is_nil(u.last_refreshed_at)) + |> Pleroma.RepoStreamer.chunk_stream(500) + |> Stream.each(fn users -> + users + |> Enum.each(fn user -> + try do + ActivityPub.make_user_from_ap_id(user.ap_id) + shell_info("Updating @#{user.nickname}") + rescue + _ -> + shell_info("Couldn't update user. Skipping.") + end + end) + end) + |> Stream.run() + end + + def run(["activities"]) do + start_pleroma() + + Object + |> where([o], fragment("?->'pleroma_internal'->>'migrator'='true'", o.data)) + |> Pleroma.RepoStreamer.chunk_stream(500) + |> Stream.each(fn objects -> + objects + |> Enum.each(fn object -> + shell_info("Transmogrifying #{object.data["id"]}") + # This doesn't write anything back to the database, it just + # fetches anything that's missing. + try do + Transmogrifier.fix_object(object.data) + rescue + _ -> + shell_info("Couldn't transmogrify. Skipping.") + end + + end) + end) + |> Stream.run() + end + +end