akkoma/lib/pleroma/web/mastodon_api/websocket_handler.ex
Oneric 13e2a811ec Avoid accumulation of stale data in websockets
We’ve received reports of some specific instances slowly accumulating
more and more binary data over time up to OOMs and globally setting
ERL_FULLSWEEP_AFTER=0 has proven to be an effective countermeasure.
However, this incurs increased cpu perf costs everywhere and is
thus not suitable to apply out of the box.

Apparently long-lived Phoenix websocket processes are known to
often cause exactly this by getting into a state unfavourable
for the garbage collector.
Therefore it seems likely affected instances are using timeline
streaming and do so in just the right way to trigger this. We
can tune the garbage collector just for websocket processes
and use a more lenient value of 20 to keep the added perf cost
in check.

Testing on one affected instance appears to confirm this theory

Ref.:
  https://www.erlang.org/doc/man/erlang#ghlink-process_flag-2-idp226
  https://blog.guzman.codes/using-phoenix-channels-high-memory-usage-save-money-with-erlfullsweepafter
  https://git.pleroma.social/pleroma/pleroma/-/merge_requests/4060

Tested-by: bjo
2024-06-22 22:22:33 +02:00

187 lines
5.8 KiB
Elixir

# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
require Logger
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer
@behaviour :cowboy_websocket
# Client ping period.
@tick :timer.seconds(30)
# Cowboy timeout period.
@timeout :timer.seconds(60)
# Hibernate every X messages
@hibernate_every 100
# Tune garabge collect for long-lived websocket process
@fullsweep_after 20
def init(%{qs: qs} = req, state) do
with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
access_token <- Map.get(params, "access_token"),
{:ok, user, oauth_token} <- authenticate_request(access_token, sec_websocket),
{:ok, topic} <- Streamer.get_topic(params["stream"], user, oauth_token, params) do
req =
if sec_websocket do
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
else
req
end
{:cowboy_websocket, req,
%{
user: user,
topic: topic,
count: 0,
timer: nil,
subscriptions: [],
oauth_token: oauth_token
}, %{idle_timeout: @timeout}}
else
{:error, :bad_topic} ->
Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
req = :cowboy_req.reply(404, req)
{:ok, req, state}
{:error, :unauthorized} ->
Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}")
req = :cowboy_req.reply(401, req)
{:ok, req, state}
end
end
def websocket_init(state) do
Logger.debug(
"#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}"
)
# process is long-lived and can sometimes accumulate stale data in such a way it's
# not freed by young garbage cycles, thus make full collection sweeps more frequent
:erlang.process_flag(:fullsweep_after, @fullsweep_after)
Streamer.add_socket(state.topic, state.oauth_token)
{:ok, %{state | timer: timer()}}
end
# Client's Pong frame.
def websocket_handle(:pong, state) do
if state.timer, do: Process.cancel_timer(state.timer)
{:ok, %{state | timer: timer()}}
end
# We only receive pings for now
def websocket_handle(:ping, state), do: {:ok, state}
def websocket_handle({:text, ping}, state) when ping in ~w[ping PING] do
if state.timer, do: Process.cancel_timer(state.timer)
{:reply, {:text, "pong"}, %{state | timer: timer()}}
end
def websocket_handle({:text, text}, state) do
with {:ok, json} <- Jason.decode(text) do
websocket_handle({:json, json}, state)
else
_ ->
Logger.error("#{__MODULE__} received text frame: #{text}")
{:ok, state}
end
end
def websocket_handle(
{:json, %{"type" => "subscribe", "stream" => stream_name}},
%{user: user, oauth_token: token} = state
) do
with {:ok, topic} <- Streamer.get_topic(stream_name, user, token, %{}) do
new_subscriptions =
[topic | Map.get(state, :subscriptions, [])]
|> Enum.uniq()
{:ok, _topic} = Streamer.add_socket(topic, user)
{:ok, Map.put(state, :subscriptions, new_subscriptions)}
else
_ ->
Logger.error("#{__MODULE__} received invalid topic: #{stream_name}")
{:ok, state}
end
end
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
def websocket_info({:render_with_user, view, template, item, topic}, state) do
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
unless Streamer.filtered_by_user?(user, item) do
websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user})
else
{:ok, state}
end
end
def websocket_info({:text, message}, state) do
# If the websocket processed X messages, force an hibernate/GC.
# We don't hibernate at every message to balance CPU usage/latency with RAM usage.
if state.count > @hibernate_every do
{:reply, {:text, message}, %{state | count: 0}, :hibernate}
else
{:reply, {:text, message}, %{state | count: state.count + 1}}
end
end
# Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received.
# As we hibernate there, reset the count to 0.
# If the client misses :pong, Cowboy will automatically timeout the connection after
# `@idle_timeout`.
def websocket_info(:tick, state) do
{:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
end
def websocket_info(:close, state) do
{:stop, state}
end
# State can be `[]` only in case we terminate before switching to websocket,
# we already log errors for these cases in `init/1`, so just do nothing here
def terminate(_reason, _req, []), do: :ok
def terminate(reason, _req, state) do
Logger.debug(
"#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
Streamer.remove_socket(state.topic)
:ok
end
# Public streams without authentication.
defp authenticate_request(nil, nil) do
{:ok, nil, nil}
end
# Authenticated streams.
defp authenticate_request(access_token, sec_websocket) do
token = access_token || sec_websocket
with true <- is_bitstring(token),
oauth_token = %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
user = %User{} <- User.get_cached_by_id(user_id) do
{:ok, user, oauth_token}
else
_ -> {:error, :unauthorized}
end
end
defp timer do
Process.send_after(self(), :tick, @tick)
end
end