diff --git a/app/controllers/api/v1/trends/tags_controller.rb b/app/controllers/api/v1/trends/tags_controller.rb index b15dd50131..10a3442344 100644 --- a/app/controllers/api/v1/trends/tags_controller.rb +++ b/app/controllers/api/v1/trends/tags_controller.rb @@ -27,7 +27,9 @@ class Api::V1::Trends::TagsController < Api::BaseController end def tags_from_trends - Trends.tags.query.allowed + scope = Trends.tags.query.allowed.in_locale(content_locale) + scope = scope.filtered_for(current_account) if user_signed_in? + scope end def next_path diff --git a/app/models/tag.rb b/app/models/tag.rb index 67fa9e5d3a..c9115b905b 100644 --- a/app/models/tag.rb +++ b/app/models/tag.rb @@ -32,6 +32,8 @@ class Tag < ApplicationRecord has_many :featured_tags, dependent: :destroy, inverse_of: :tag has_many :followers, through: :passive_relationships, source: :account + has_one :trend, class_name: 'TagTrend', inverse_of: :tag, dependent: :destroy + HASHTAG_SEPARATORS = "_\u00B7\u30FB\u200c" HASHTAG_FIRST_SEQUENCE_CHUNK_ONE = "[[:word:]_][[:word:]#{HASHTAG_SEPARATORS}]*[[:alpha:]#{HASHTAG_SEPARATORS}]" HASHTAG_FIRST_SEQUENCE_CHUNK_TWO = "[[:word:]#{HASHTAG_SEPARATORS}]*[[:word:]_]" diff --git a/app/models/tag_trend.rb b/app/models/tag_trend.rb new file mode 100644 index 0000000000..85a9028f21 --- /dev/null +++ b/app/models/tag_trend.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +# == Schema Information +# +# Table name: tag_trends +# +# id :bigint(8) not null, primary key +# tag_id :bigint(8) not null +# score :float default(0.0), not null +# rank :integer default(0), not null +# allowed :boolean default(FALSE), not null +# language :string +# +class TagTrend < ApplicationRecord + include RankedTrend + + belongs_to :tag + + scope :allowed, -> { where(allowed: true) } + scope :not_allowed, -> { where(allowed: false) } +end diff --git a/app/models/trends/base.rb b/app/models/trends/base.rb index a189f11f23..47958aa824 100644 --- a/app/models/trends/base.rb +++ b/app/models/trends/base.rb @@ -37,18 +37,6 @@ class Trends::Base Trends::Query.new(key_prefix, klass) end - def score(id, locale: nil) - redis.zscore([key_prefix, 'all', locale].compact.join(':'), id) || 0 - end - - def rank(id, locale: nil) - redis.zrevrank([key_prefix, 'allowed', locale].compact.join(':'), id) - end - - def currently_trending_ids(allowed, limit) - redis.zrevrange(allowed ? "#{key_prefix}:allowed" : "#{key_prefix}:all", 0, limit.positive? ? limit - 1 : limit).map(&:to_i) - end - protected def key_prefix @@ -64,42 +52,9 @@ class Trends::Base redis.expire(used_key(at_time), 1.day.seconds) end - def score_at_rank(rank) - redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0 - end - - def replace_items(suffix, items) - tmp_prefix = "#{key_prefix}:tmp:#{SecureRandom.alphanumeric(6)}#{suffix}" - allowed_items = filter_for_allowed_items(items) - - redis.pipelined do |pipeline| - items.each { |item| pipeline.zadd("#{tmp_prefix}:all", item[:score], item[:item].id) } - allowed_items.each { |item| pipeline.zadd("#{tmp_prefix}:allowed", item[:score], item[:item].id) } - - rename_set(pipeline, "#{tmp_prefix}:all", "#{key_prefix}:all#{suffix}", items) - rename_set(pipeline, "#{tmp_prefix}:allowed", "#{key_prefix}:allowed#{suffix}", allowed_items) - end - end - - def filter_for_allowed_items(items) - raise NotImplementedError - end - private def used_key(at_time) "#{key_prefix}:used:#{at_time.beginning_of_day.to_i}" end - - def rename_set(pipeline, from_key, to_key, set_items) - if set_items.empty? - pipeline.del(to_key) - else - pipeline.rename(from_key, to_key) - end - end - - def skip_review? - Setting.trendable_by_default - end end diff --git a/app/models/trends/query.rb b/app/models/trends/query.rb index c4edbba6b8..fe79205bf6 100644 --- a/app/models/trends/query.rb +++ b/app/models/trends/query.rb @@ -1,15 +1,13 @@ # frozen_string_literal: true class Trends::Query - include Redisable include Enumerable - attr_reader :prefix, :klass, :loaded + attr_reader :klass, :loaded alias loaded? loaded - def initialize(prefix, klass) - @prefix = prefix + def initialize(_prefix, klass) @klass = klass @records = [] @loaded = false @@ -68,22 +66,11 @@ class Trends::Query alias to_a to_ary def to_arel - if ids_for_key.empty? - klass.none - else - scope = klass.joins(sanitized_join_sql).reorder('x.ordering') - scope = scope.offset(@offset) if @offset.present? - scope = scope.limit(@limit) if @limit.present? - scope - end + raise NotImplementedError end private - def key - [@prefix, @allowed ? 'allowed' : 'all', @locale].compact.join(':') - end - def load unless loaded? @records = perform_queries @@ -93,29 +80,7 @@ class Trends::Query self end - def ids_for_key - @ids_for_key ||= redis.zrevrange(key, 0, -1).map(&:to_i) - end - - def sanitized_join_sql - ActiveRecord::Base.sanitize_sql_array(join_sql_array) - end - - def join_sql_array - [join_sql_query, ids_for_key] - end - - def join_sql_query - <<~SQL.squish - JOIN unnest(array[?]) WITH ordinality AS x (id, ordering) ON #{klass.table_name}.id = x.id - SQL - end - def perform_queries - apply_scopes(to_arel).to_a - end - - def apply_scopes(scope) - scope + to_arel.to_a end end diff --git a/app/models/trends/tag_filter.rb b/app/models/trends/tag_filter.rb index d6f88a9486..02d558ac25 100644 --- a/app/models/trends/tag_filter.rb +++ b/app/models/trends/tag_filter.rb @@ -6,6 +6,8 @@ class Trends::TagFilter status ).freeze + IGNORED_PARAMS = %w(page).freeze + attr_reader :params def initialize(params) @@ -13,14 +15,10 @@ class Trends::TagFilter end def results - scope = if params[:status] == 'pending_review' - Tag.unscoped.order(id: :desc) - else - trending_scope - end + scope = initial_scope params.each do |key, value| - next if key.to_s == 'page' + next if IGNORED_PARAMS.include?(key.to_s) scope.merge!(scope_for(key, value.to_s.strip)) if value.present? end @@ -30,19 +28,24 @@ class Trends::TagFilter private + def initial_scope + Tag.select(Tag.arel_table[Arel.star]) + .joins(:trend) + .eager_load(:trend) + .reorder(score: :desc) + end + def scope_for(key, value) case key.to_s when 'status' status_scope(value) + when 'trending' + trending_scope(value) else - raise "Unknown filter: #{key}" + raise Mastodon::InvalidParameterError, "Unknown filter: #{key}" end end - def trending_scope - Trends.tags.query.to_arel - end - def status_scope(value) case value.to_s when 'approved' @@ -52,7 +55,16 @@ class Trends::TagFilter when 'pending_review' Tag.pending_review else - raise "Unknown status: #{value}" + raise Mastodon::InvalidParameterError, "Unknown status: #{value}" + end + end + + def trending_scope(value) + case value + when 'allowed' + TagTrend.allowed + else + TagTrend.all end end end diff --git a/app/models/trends/tags.rb b/app/models/trends/tags.rb index 9315329906..5b872a3103 100644 --- a/app/models/trends/tags.rb +++ b/app/models/trends/tags.rb @@ -3,6 +3,8 @@ class Trends::Tags < Trends::Base PREFIX = 'trending_tags' + BATCH_SIZE = 100 + self.default_options = { threshold: 5, review_threshold: 3, @@ -11,6 +13,39 @@ class Trends::Tags < Trends::Base decay_threshold: 1, } + class Query < Trends::Query + def filtered_for!(account) + @account = account + self + end + + def filtered_for(account) + clone.filtered_for!(account) + end + + def to_arel + scope = Tag.joins(:trend).reorder(language_order_clause.desc, score: :desc) + scope = scope.merge(TagTrend.allowed) if @allowed + scope = scope.offset(@offset) if @offset.present? + scope = scope.limit(@limit) if @limit.present? + scope + end + + private + + def language_order_clause + Arel::Nodes::Case.new.when(TagTrend.arel_table[:language].in(preferred_languages)).then(1).else(0) + end + + def preferred_languages + if @account&.chosen_languages.present? + @account.chosen_languages + else + @locale + end + end + end + def register(status, at_time = Time.now.utc) return unless !status.reblog? && status.public_visibility? && !status.account.silenced? @@ -24,19 +59,39 @@ class Trends::Tags < Trends::Base record_used_id(tag.id, at_time) end + def query + Query.new(key_prefix, klass) + end + def refresh(at_time = Time.now.utc) - tags = Tag.where(id: (recently_used_ids(at_time) + currently_trending_ids(false, -1)).uniq) - calculate_scores(tags, at_time) + # First, recalculate scores for links that were trending previously. We split the queries + # to avoid having to load all of the IDs into Ruby just to send them back into Postgres + Tag.where(id: TagTrend.select(:tag_id)).find_in_batches(batch_size: BATCH_SIZE) do |tags| + calculate_scores(tags, at_time) + end + + # Then, calculate scores for links that were used today. There are potentially some + # duplicate items here that we might process one more time, but that should be fine + Tag.where(id: recently_used_ids(at_time)).find_in_batches(batch_size: BATCH_SIZE) do |tags| + calculate_scores(tags, at_time) + end + + # Now that all trends have up-to-date scores, and all the ones below the threshold have + # been removed, we can recalculate their positions + TagTrend.recalculate_ordered_rank end def request_review - tags = Tag.where(id: currently_trending_ids(false, -1)) + score_at_threshold = TagTrend.allowed.by_rank.ranked_below(options[:review_threshold]).first&.score || 0 + tag_trends = TagTrend.not_allowed.includes(:tag) - tags.filter_map do |tag| - next unless would_be_trending?(tag.id) && !tag.trendable? && tag.requires_review_notification? + tag_trends.filter_map do |trend| + tag = trend.tag - tag.touch(:requested_review_at) - tag + if trend.score > score_at_threshold && !tag.trendable? && tag.requires_review_notification? + tag.touch(:requested_review_at) + tag + end end end @@ -53,9 +108,7 @@ class Trends::Tags < Trends::Base private def calculate_scores(tags, at_time) - items = [] - - tags.each do |tag| + items = tags.map do |tag| expected = tag.history.get(at_time - 1.day).accounts.to_f expected = 1.0 if expected.zero? observed = tag.history.get(at_time).accounts.to_f @@ -79,19 +132,13 @@ class Trends::Tags < Trends::Base decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f)) - next unless decaying_score >= options[:decay_threshold] - - items << { score: decaying_score, item: tag } + [decaying_score, tag] end - replace_items('', items) - end + to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] } + to_delete = items.filter { |(score, _)| score < options[:decay_threshold] } - def filter_for_allowed_items(items) - items.select { |item| item[:item].trendable? } - end - - def would_be_trending?(id) - score(id) > score_at_rank(options[:review_threshold] - 1) + TagTrend.upsert_all(to_insert.map { |(score, tag)| { tag_id: tag.id, score: score, language: '', allowed: tag.trendable? || false } }, unique_by: %w(tag_id language)) if to_insert.any? + TagTrend.where(tag_id: to_delete.map { |(_, tag)| tag.id }).delete_all if to_delete.any? end end diff --git a/app/views/admin/trends/tags/_tag.html.haml b/app/views/admin/trends/tags/_tag.html.haml index b1e714a912..e0f9f39e47 100644 --- a/app/views/admin/trends/tags/_tag.html.haml +++ b/app/views/admin/trends/tags/_tag.html.haml @@ -11,9 +11,9 @@ = link_to tag_path(tag), target: '_blank', rel: 'noopener noreferrer' do = t('admin.trends.tags.used_by_over_week', count: tag.history.reduce(0) { |sum, day| sum + day.accounts }) - - if tag.trendable? && (rank = Trends.tags.rank(tag.id)) + - if tag.trendable? · - %abbr{ title: t('admin.trends.tags.current_score', score: Trends.tags.score(tag.id)) }= t('admin.trends.tags.trending_rank', rank: rank + 1) + %abbr{ title: t('admin.trends.tags.current_score', score: tag.trend.score) }= t('admin.trends.tags.trending_rank', rank: tag.trend.rank + 1) - if tag.decaying? · diff --git a/app/views/admin_mailer/_new_trending_tags.text.erb b/app/views/admin_mailer/_new_trending_tags.text.erb index f738caaf3d..c9bd1bc717 100644 --- a/app/views/admin_mailer/_new_trending_tags.text.erb +++ b/app/views/admin_mailer/_new_trending_tags.text.erb @@ -2,7 +2,7 @@ <% new_trending_tags.each do |tag| %> - #<%= tag.display_name %> - <%= raw t('admin.trends.tags.usage_comparison', today: tag.history.get(Time.now.utc).accounts, yesterday: tag.history.get(Time.now.utc - 1.day).accounts) %> · <%= t('admin.trends.tags.current_score', score: Trends.tags.score(tag.id).round(2)) %> + <%= raw t('admin.trends.tags.usage_comparison', today: tag.history.get(Time.now.utc).accounts, yesterday: tag.history.get(Time.now.utc - 1.day).accounts) %> · <%= t('admin.trends.tags.current_score', score: tag.trend.score.round(2)) %> <% end %> <%= raw t('application_mailer.view')%> <%= admin_trends_tags_url(status: 'pending_review') %> diff --git a/db/migrate/20241111141355_create_tag_trends.rb b/db/migrate/20241111141355_create_tag_trends.rb new file mode 100644 index 0000000000..c4c7d13d19 --- /dev/null +++ b/db/migrate/20241111141355_create_tag_trends.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class CreateTagTrends < ActiveRecord::Migration[7.2] + def change + create_table :tag_trends do |t| # rubocop:disable Rails/CreateTableWithTimestamps + t.references :tag, null: false, foreign_key: { on_delete: :cascade }, index: false + t.float :score, null: false, default: 0 + t.integer :rank, null: false, default: 0 + t.boolean :allowed, null: false, default: false + t.string :language, null: false, default: '' + end + + add_index :tag_trends, [:tag_id, :language], unique: true + end +end diff --git a/db/post_migrate/20241123160722_move_tag_trends_to_table.rb b/db/post_migrate/20241123160722_move_tag_trends_to_table.rb new file mode 100644 index 0000000000..3e1f1b5440 --- /dev/null +++ b/db/post_migrate/20241123160722_move_tag_trends_to_table.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +class MoveTagTrendsToTable < ActiveRecord::Migration[7.2] + include Redisable + + disable_ddl_transaction! + + def up + redis.zrange('trending_tags:all', 0, -1, with_scores: true).each do |(tag_id, score)| + TagTrend.create( + tag_id: tag_id, + score: score, + allowed: redis.zscore('trending_tags:allowed', tag_id).present? + ) + end + + TagTrend.recalculate_ordered_rank + + redis.del('trending_tags:allowed', 'trending_tags:all') + end + + def down + raise ActiveRecord::IrreversibleMigration + end +end diff --git a/db/schema.rb b/db/schema.rb index 30b16a8a82..fd4eb6ca38 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2024_11_04_082851) do +ActiveRecord::Schema[7.1].define(version: 2024_11_23_160722) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -1080,6 +1080,15 @@ ActiveRecord::Schema[7.2].define(version: 2024_11_04_082851) do t.index ["tag_id"], name: "index_tag_follows_on_tag_id" end + create_table "tag_trends", force: :cascade do |t| + t.bigint "tag_id", null: false + t.float "score", default: 0.0, null: false + t.integer "rank", default: 0, null: false + t.boolean "allowed", default: false, null: false + t.string "language" + t.index ["tag_id", "language"], name: "index_tag_trends_on_tag_id_and_language", unique: true + end + create_table "tags", force: :cascade do |t| t.string "name", default: "", null: false t.datetime "created_at", precision: nil, null: false @@ -1343,6 +1352,7 @@ ActiveRecord::Schema[7.2].define(version: 2024_11_04_082851) do add_foreign_key "statuses_tags", "tags", name: "fk_3081861e21", on_delete: :cascade add_foreign_key "tag_follows", "accounts", on_delete: :cascade add_foreign_key "tag_follows", "tags", on_delete: :cascade + add_foreign_key "tag_trends", "tags", on_delete: :cascade add_foreign_key "tombstones", "accounts", on_delete: :cascade add_foreign_key "user_invite_requests", "users", on_delete: :cascade add_foreign_key "users", "accounts", name: "fk_50500f500d", on_delete: :cascade diff --git a/spec/fabricators/tag_trend_fabricator.rb b/spec/fabricators/tag_trend_fabricator.rb new file mode 100644 index 0000000000..ddf9b9bf40 --- /dev/null +++ b/spec/fabricators/tag_trend_fabricator.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +Fabricator(:tag_trend) do + tag +end diff --git a/spec/mailers/admin_mailer_spec.rb b/spec/mailers/admin_mailer_spec.rb index cd1ab3311c..a4bf9dd667 100644 --- a/spec/mailers/admin_mailer_spec.rb +++ b/spec/mailers/admin_mailer_spec.rb @@ -71,6 +71,7 @@ RSpec.describe AdminMailer do before do PreviewCardTrend.create!(preview_card: link) StatusTrend.create!(status: status, account: Fabricate(:account)) + TagTrend.create!(tag: tag) recipient.user.update(locale: :en) end diff --git a/spec/models/trends/tags_spec.rb b/spec/models/trends/tags_spec.rb index f2818fca87..936b441d92 100644 --- a/spec/models/trends/tags_spec.rb +++ b/spec/models/trends/tags_spec.rb @@ -61,10 +61,10 @@ RSpec.describe Trends::Tags do it 'decays scores' do subject.refresh(yesterday + 12.hours) - original_score = subject.score(tag_ocs.id) + original_score = TagTrend.find_by(tag: tag_ocs).score expect(original_score).to eq 144.0 subject.refresh(yesterday + 12.hours + subject.options[:max_score_halflife]) - decayed_score = subject.score(tag_ocs.id) + decayed_score = TagTrend.find_by(tag: tag_ocs).score expect(decayed_score).to be <= original_score / 2 end end