Reload notifications when accepted notifications are merged (streaming only) (#31419)

This commit is contained in:
Claire 2024-08-19 17:59:06 +02:00 committed by GitHub
parent d4f135bc6d
commit 53c183f899
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 192 additions and 24 deletions

View file

@ -1,8 +1,10 @@
# frozen_string_literal: true # frozen_string_literal: true
class Api::V1::Notifications::RequestsController < Api::BaseController class Api::V1::Notifications::RequestsController < Api::BaseController
before_action -> { doorkeeper_authorize! :read, :'read:notifications' }, only: :index include Redisable
before_action -> { doorkeeper_authorize! :write, :'write:notifications' }, except: :index
before_action -> { doorkeeper_authorize! :read, :'read:notifications' }, only: [:index, :show, :merged?]
before_action -> { doorkeeper_authorize! :write, :'write:notifications' }, except: [:index, :show, :merged?]
before_action :require_user! before_action :require_user!
before_action :set_request, only: [:show, :accept, :dismiss] before_action :set_request, only: [:show, :accept, :dismiss]
@ -19,6 +21,10 @@ class Api::V1::Notifications::RequestsController < Api::BaseController
render json: @requests, each_serializer: REST::NotificationRequestSerializer, relationships: @relationships render json: @requests, each_serializer: REST::NotificationRequestSerializer, relationships: @relationships
end end
def merged?
render json: { merged: redis.get("notification_unfilter_jobs:#{current_account.id}").to_i <= 0 }
end
def show def show
render json: @request, serializer: REST::NotificationRequestSerializer render json: @request, serializer: REST::NotificationRequestSerializer
end end

View file

@ -138,8 +138,18 @@ export const processNewNotificationForGroups = createAppAsyncThunk(
export const loadPending = createAction('notificationGroups/loadPending'); export const loadPending = createAction('notificationGroups/loadPending');
export const updateScrollPosition = createAction<{ top: boolean }>( export const updateScrollPosition = createAppAsyncThunk(
'notificationGroups/updateScrollPosition', 'notificationGroups/updateScrollPosition',
({ top }: { top: boolean }, { dispatch, getState }) => {
if (
top &&
getState().notificationGroups.mergedNotifications === 'needs-reload'
) {
void dispatch(fetchNotifications());
}
return { top };
},
); );
export const setNotificationsFilter = createAppAsyncThunk( export const setNotificationsFilter = createAppAsyncThunk(
@ -165,5 +175,34 @@ export const markNotificationsAsRead = createAction(
'notificationGroups/markAsRead', 'notificationGroups/markAsRead',
); );
export const mountNotifications = createAction('notificationGroups/mount'); export const mountNotifications = createAppAsyncThunk(
'notificationGroups/mount',
(_, { dispatch, getState }) => {
const state = getState();
if (
state.notificationGroups.mounted === 0 &&
state.notificationGroups.mergedNotifications === 'needs-reload'
) {
void dispatch(fetchNotifications());
}
},
);
export const unmountNotifications = createAction('notificationGroups/unmount'); export const unmountNotifications = createAction('notificationGroups/unmount');
export const refreshStaleNotificationGroups = createAppAsyncThunk<{
deferredRefresh: boolean;
}>('notificationGroups/refreshStale', (_, { dispatch, getState }) => {
const state = getState();
if (
state.notificationGroups.scrolledToTop ||
!state.notificationGroups.mounted
) {
void dispatch(fetchNotifications());
return { deferredRefresh: false };
}
return { deferredRefresh: true };
});

View file

@ -10,7 +10,7 @@ import {
deleteAnnouncement, deleteAnnouncement,
} from './announcements'; } from './announcements';
import { updateConversations } from './conversations'; import { updateConversations } from './conversations';
import { processNewNotificationForGroups } from './notification_groups'; import { processNewNotificationForGroups, refreshStaleNotificationGroups } from './notification_groups';
import { updateNotifications, expandNotifications } from './notifications'; import { updateNotifications, expandNotifications } from './notifications';
import { updateStatus } from './statuses'; import { updateStatus } from './statuses';
import { import {
@ -108,6 +108,14 @@ export const connectTimelineStream = (timelineId, channelName, params = {}, opti
} }
break; break;
} }
case 'notifications_merged':
const state = getState();
if (state.notifications.top || !state.notifications.mounted)
dispatch(expandNotifications({ forceLoad: true, maxId: undefined }));
if(state.settings.getIn(['notifications', 'groupingBeta'], false)) {
dispatch(refreshStaleNotificationGroups());
}
break;
case 'conversation': case 'conversation':
// @ts-expect-error // @ts-expect-error
dispatch(updateConversations(JSON.parse(data.payload))); dispatch(updateConversations(JSON.parse(data.payload)));

View file

@ -81,7 +81,11 @@ export const Notifications: React.FC<{
const anyPendingNotification = useAppSelector(selectAnyPendingNotification); const anyPendingNotification = useAppSelector(selectAnyPendingNotification);
const isUnread = unreadNotificationsCount > 0; const needsReload = useAppSelector(
(state) => state.notificationGroups.mergedNotifications === 'needs-reload',
);
const isUnread = unreadNotificationsCount > 0 || needsReload;
const canMarkAsRead = const canMarkAsRead =
useAppSelector(selectSettingsNotificationsShowUnread) && useAppSelector(selectSettingsNotificationsShowUnread) &&
@ -118,11 +122,11 @@ export const Notifications: React.FC<{
// Keep track of mounted components for unread notification handling // Keep track of mounted components for unread notification handling
useEffect(() => { useEffect(() => {
dispatch(mountNotifications()); void dispatch(mountNotifications());
return () => { return () => {
dispatch(unmountNotifications()); dispatch(unmountNotifications());
dispatch(updateScrollPosition({ top: false })); void dispatch(updateScrollPosition({ top: false }));
}; };
}, [dispatch]); }, [dispatch]);
@ -147,11 +151,11 @@ export const Notifications: React.FC<{
}, [dispatch]); }, [dispatch]);
const handleScrollToTop = useDebouncedCallback(() => { const handleScrollToTop = useDebouncedCallback(() => {
dispatch(updateScrollPosition({ top: true })); void dispatch(updateScrollPosition({ top: true }));
}, 100); }, 100);
const handleScroll = useDebouncedCallback(() => { const handleScroll = useDebouncedCallback(() => {
dispatch(updateScrollPosition({ top: false })); void dispatch(updateScrollPosition({ top: false }));
}, 100); }, 100);
useEffect(() => { useEffect(() => {

View file

@ -19,6 +19,7 @@ import {
markNotificationsAsRead, markNotificationsAsRead,
mountNotifications, mountNotifications,
unmountNotifications, unmountNotifications,
refreshStaleNotificationGroups,
} from 'mastodon/actions/notification_groups'; } from 'mastodon/actions/notification_groups';
import { import {
disconnectTimeline, disconnectTimeline,
@ -51,6 +52,7 @@ interface NotificationGroupsState {
readMarkerId: string; readMarkerId: string;
mounted: number; mounted: number;
isTabVisible: boolean; isTabVisible: boolean;
mergedNotifications: 'ok' | 'pending' | 'needs-reload';
} }
const initialState: NotificationGroupsState = { const initialState: NotificationGroupsState = {
@ -58,6 +60,8 @@ const initialState: NotificationGroupsState = {
pendingGroups: [], // holds pending groups in slow mode pendingGroups: [], // holds pending groups in slow mode
scrolledToTop: false, scrolledToTop: false,
isLoading: false, isLoading: false,
// this is used to track whether we need to refresh notifications after accepting requests
mergedNotifications: 'ok',
// The following properties are used to track unread notifications // The following properties are used to track unread notifications
lastReadId: '0', // used internally for unread notifications lastReadId: '0', // used internally for unread notifications
readMarkerId: '0', // user-facing and updated when focus changes readMarkerId: '0', // user-facing and updated when focus changes
@ -301,6 +305,7 @@ export const notificationGroupsReducer = createReducer<NotificationGroupsState>(
json.type === 'gap' ? json : createNotificationGroupFromJSON(json), json.type === 'gap' ? json : createNotificationGroupFromJSON(json),
); );
state.isLoading = false; state.isLoading = false;
state.mergedNotifications = 'ok';
updateLastReadId(state); updateLastReadId(state);
}) })
.addCase(fetchNotificationsGap.fulfilled, (state, action) => { .addCase(fetchNotificationsGap.fulfilled, (state, action) => {
@ -455,7 +460,7 @@ export const notificationGroupsReducer = createReducer<NotificationGroupsState>(
state.groups = state.pendingGroups.concat(state.groups); state.groups = state.pendingGroups.concat(state.groups);
state.pendingGroups = []; state.pendingGroups = [];
}) })
.addCase(updateScrollPosition, (state, action) => { .addCase(updateScrollPosition.fulfilled, (state, action) => {
state.scrolledToTop = action.payload.top; state.scrolledToTop = action.payload.top;
updateLastReadId(state); updateLastReadId(state);
trimNotifications(state); trimNotifications(state);
@ -482,7 +487,7 @@ export const notificationGroupsReducer = createReducer<NotificationGroupsState>(
action.payload.markers.notifications.last_read_id; action.payload.markers.notifications.last_read_id;
} }
}) })
.addCase(mountNotifications, (state) => { .addCase(mountNotifications.fulfilled, (state) => {
state.mounted += 1; state.mounted += 1;
commitLastReadId(state); commitLastReadId(state);
updateLastReadId(state); updateLastReadId(state);
@ -498,6 +503,10 @@ export const notificationGroupsReducer = createReducer<NotificationGroupsState>(
.addCase(unfocusApp, (state) => { .addCase(unfocusApp, (state) => {
state.isTabVisible = false; state.isTabVisible = false;
}) })
.addCase(refreshStaleNotificationGroups.fulfilled, (state, action) => {
if (action.payload.deferredRefresh)
state.mergedNotifications = 'needs-reload';
})
.addMatcher( .addMatcher(
isAnyOf(authorizeFollowRequestSuccess, rejectFollowRequestSuccess), isAnyOf(authorizeFollowRequestSuccess, rejectFollowRequestSuccess),
(state, action) => { (state, action) => {

View file

@ -1,9 +1,21 @@
# frozen_string_literal: true # frozen_string_literal: true
class AcceptNotificationRequestService < BaseService class AcceptNotificationRequestService < BaseService
include Redisable
def call(request) def call(request)
NotificationPermission.create!(account: request.account, from_account: request.from_account) NotificationPermission.create!(account: request.account, from_account: request.from_account)
increment_worker_count!(request)
UnfilterNotificationsWorker.perform_async(request.account_id, request.from_account_id) UnfilterNotificationsWorker.perform_async(request.account_id, request.from_account_id)
request.destroy! request.destroy!
end end
private
def increment_worker_count!(request)
with_redis do |redis|
redis.incr("notification_unfilter_jobs:#{request.account_id}")
redis.expire("notification_unfilter_jobs:#{request.account_id}", 30.minutes.to_i)
end
end
end end

View file

@ -2,6 +2,7 @@
class UnfilterNotificationsWorker class UnfilterNotificationsWorker
include Sidekiq::Worker include Sidekiq::Worker
include Redisable
# Earlier versions of the feature passed a `notification_request` ID # Earlier versions of the feature passed a `notification_request` ID
# If `to_account_id` is passed, the first argument is an account ID # If `to_account_id` is passed, the first argument is an account ID
@ -9,19 +10,20 @@ class UnfilterNotificationsWorker
def perform(notification_request_or_account_id, from_account_id = nil) def perform(notification_request_or_account_id, from_account_id = nil)
if from_account_id.present? if from_account_id.present?
@notification_request = nil @notification_request = nil
@from_account = Account.find(from_account_id) @from_account = Account.find_by(id: from_account_id)
@recipient = Account.find(notification_request_or_account_id) @recipient = Account.find_by(id: notification_request_or_account_id)
else else
@notification_request = NotificationRequest.find(notification_request_or_account_id) @notification_request = NotificationRequest.find_by(id: notification_request_or_account_id)
@from_account = @notification_request.from_account @from_account = @notification_request&.from_account
@recipient = @notification_request.account @recipient = @notification_request&.account
end end
return if @from_account.nil? || @recipient.nil?
push_to_conversations! push_to_conversations!
unfilter_notifications! unfilter_notifications!
remove_request! remove_request!
rescue ActiveRecord::RecordNotFound decrement_worker_count!
true
end end
private private
@ -45,4 +47,17 @@ class UnfilterNotificationsWorker
def notifications_with_private_mentions def notifications_with_private_mentions
filtered_notifications.where(type: :mention).joins(mention: :status).merge(Status.where(visibility: :direct)).includes(mention: :status) filtered_notifications.where(type: :mention).joins(mention: :status).merge(Status.where(visibility: :direct)).includes(mention: :status)
end end
def decrement_worker_count!
value = redis.decr("notification_unfilter_jobs:#{@recipient.id}")
push_streaming_event! if value <= 0 && subscribed_to_streaming_api?
end
def push_streaming_event!
redis.publish("timeline:#{@recipient.id}:notifications", Oj.dump(event: :notifications_merged, payload: '1'))
end
def subscribed_to_streaming_api?
redis.exists?("subscribed:timeline:#{@recipient.id}") || redis.exists?("subscribed:timeline:#{@recipient.id}:notifications")
end
end end

View file

@ -158,6 +158,7 @@ namespace :api, format: false do
collection do collection do
post :accept, to: 'requests#accept_bulk' post :accept, to: 'requests#accept_bulk'
post :dismiss, to: 'requests#dismiss_bulk' post :dismiss, to: 'requests#dismiss_bulk'
get :merged, to: 'requests#merged?'
end end
member do member do

View file

@ -120,4 +120,34 @@ RSpec.describe 'Requests' do
expect(response).to have_http_status(200) expect(response).to have_http_status(200)
end end
end end
describe 'GET /api/v1/notifications/requests/merged' do
subject do
get '/api/v1/notifications/requests/merged', headers: headers
end
it_behaves_like 'forbidden for wrong scope', 'write write:notifications'
context 'when the user has no accepted request pending merge' do
it 'returns http success and returns merged: true' do
subject
expect(response).to have_http_status(200)
expect(body_as_json).to eq({ merged: true })
end
end
context 'when the user has an accepted request pending merge' do
before do
redis.set("notification_unfilter_jobs:#{user.account_id}", 1)
end
it 'returns http success and returns merged: false' do
subject
expect(response).to have_http_status(200)
expect(body_as_json).to eq({ merged: false })
end
end
end
end end

View file

@ -8,10 +8,11 @@ RSpec.describe AcceptNotificationRequestService do
let(:notification_request) { Fabricate(:notification_request) } let(:notification_request) { Fabricate(:notification_request) }
describe '#call' do describe '#call' do
it 'destroys the notification request, creates a permission, and queues a worker' do it 'destroys the notification request, creates a permission, increases the jobs count and queues a worker' do
expect { subject.call(notification_request) } expect { subject.call(notification_request) }
.to change { NotificationRequest.exists?(notification_request.id) }.to(false) .to change { NotificationRequest.exists?(notification_request.id) }.to(false)
.and change { NotificationPermission.exists?(account_id: notification_request.account_id, from_account_id: notification_request.from_account_id) }.to(true) .and change { NotificationPermission.exists?(account_id: notification_request.account_id, from_account_id: notification_request.from_account_id) }.to(true)
.and change { redis.get("notification_unfilter_jobs:#{notification_request.account_id}").to_i }.by(1)
expect(UnfilterNotificationsWorker).to have_enqueued_sidekiq_job(notification_request.account_id, notification_request.from_account_id) expect(UnfilterNotificationsWorker).to have_enqueued_sidekiq_job(notification_request.account_id, notification_request.from_account_id)
end end

View file

@ -13,13 +13,56 @@ describe UnfilterNotificationsWorker do
Fabricate(:notification, filtered: true, from_account: sender, account: recipient, type: :mention, activity: mention) Fabricate(:notification, filtered: true, from_account: sender, account: recipient, type: :mention, activity: mention)
follow_request = sender.request_follow!(recipient) follow_request = sender.request_follow!(recipient)
Fabricate(:notification, filtered: true, from_account: sender, account: recipient, type: :follow_request, activity: follow_request) Fabricate(:notification, filtered: true, from_account: sender, account: recipient, type: :follow_request, activity: follow_request)
allow(redis).to receive(:publish)
allow(redis).to receive(:exists?).and_return(false)
end end
shared_examples 'shared behavior' do shared_examples 'shared behavior' do
it 'unfilters notifications and adds private messages to conversations' do context 'when this is the last pending merge job and the user is subscribed to streaming' do
before do
redis.set("notification_unfilter_jobs:#{recipient.id}", 1)
allow(redis).to receive(:exists?).with("subscribed:timeline:#{recipient.id}").and_return(true)
end
it 'unfilters notifications, adds private messages to conversations, and pushes to redis' do
expect { subject } expect { subject }
.to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false]) .to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false])
.and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true) .and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true)
.and change { redis.get("notification_unfilter_jobs:#{recipient.id}").to_i }.by(-1)
expect(redis).to have_received(:publish).with("timeline:#{recipient.id}:notifications", '{"event":"notifications_merged","payload":"1"}')
end
end
context 'when this is not last pending merge job and the user is subscribed to streaming' do
before do
redis.set("notification_unfilter_jobs:#{recipient.id}", 2)
allow(redis).to receive(:exists?).with("subscribed:timeline:#{recipient.id}").and_return(true)
end
it 'unfilters notifications, adds private messages to conversations, and does not push to redis' do
expect { subject }
.to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false])
.and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true)
.and change { redis.get("notification_unfilter_jobs:#{recipient.id}").to_i }.by(-1)
expect(redis).to_not have_received(:publish).with("timeline:#{recipient.id}:notifications", '{"event":"notifications_merged","payload":"1"}')
end
end
context 'when this is the last pending merge job and the user is not subscribed to streaming' do
before do
redis.set("notification_unfilter_jobs:#{recipient.id}", 1)
end
it 'unfilters notifications, adds private messages to conversations, and does not push to redis' do
expect { subject }
.to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false])
.and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true)
.and change { redis.get("notification_unfilter_jobs:#{recipient.id}").to_i }.by(-1)
expect(redis).to_not have_received(:publish).with("timeline:#{recipient.id}:notifications", '{"event":"notifications_merged","payload":"1"}')
end
end end
end end