mirror of
https://github.com/mastodon/mastodon.git
synced 2024-11-21 21:57:19 +00:00
Change hashtag trends to be stored in the database instead of redis
This commit is contained in:
parent
f5f6273d2b
commit
1010f3749c
|
@ -27,7 +27,9 @@ class Api::V1::Trends::TagsController < Api::BaseController
|
||||||
end
|
end
|
||||||
|
|
||||||
def tags_from_trends
|
def tags_from_trends
|
||||||
Trends.tags.query.allowed
|
scope = Trends.tags.query.allowed.in_locale(content_locale)
|
||||||
|
scope = scope.filtered_for(current_account) if user_signed_in?
|
||||||
|
scope
|
||||||
end
|
end
|
||||||
|
|
||||||
def next_path
|
def next_path
|
||||||
|
|
|
@ -32,6 +32,8 @@ class Tag < ApplicationRecord
|
||||||
has_many :featured_tags, dependent: :destroy, inverse_of: :tag
|
has_many :featured_tags, dependent: :destroy, inverse_of: :tag
|
||||||
has_many :followers, through: :passive_relationships, source: :account
|
has_many :followers, through: :passive_relationships, source: :account
|
||||||
|
|
||||||
|
has_one :trend, class_name: 'TagTrend', inverse_of: :tag, dependent: :destroy
|
||||||
|
|
||||||
HASHTAG_SEPARATORS = "_\u00B7\u30FB\u200c"
|
HASHTAG_SEPARATORS = "_\u00B7\u30FB\u200c"
|
||||||
HASHTAG_FIRST_SEQUENCE_CHUNK_ONE = "[[:word:]_][[:word:]#{HASHTAG_SEPARATORS}]*[[:alpha:]#{HASHTAG_SEPARATORS}]"
|
HASHTAG_FIRST_SEQUENCE_CHUNK_ONE = "[[:word:]_][[:word:]#{HASHTAG_SEPARATORS}]*[[:alpha:]#{HASHTAG_SEPARATORS}]"
|
||||||
HASHTAG_FIRST_SEQUENCE_CHUNK_TWO = "[[:word:]#{HASHTAG_SEPARATORS}]*[[:word:]_]"
|
HASHTAG_FIRST_SEQUENCE_CHUNK_TWO = "[[:word:]#{HASHTAG_SEPARATORS}]*[[:word:]_]"
|
||||||
|
@ -101,8 +103,12 @@ class Tag < ApplicationRecord
|
||||||
max_score_at && max_score_at >= Trends.tags.options[:max_score_cooldown].ago && max_score_at < 1.day.ago
|
max_score_at && max_score_at >= Trends.tags.options[:max_score_cooldown].ago && max_score_at < 1.day.ago
|
||||||
end
|
end
|
||||||
|
|
||||||
def history
|
def history(language = nil)
|
||||||
@history ||= Trends::History.new('tags', id)
|
if language.nil?
|
||||||
|
@history ||= Trends::History.new('tags', id)
|
||||||
|
else
|
||||||
|
Trends::History.new("tags:#{language}", id)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
|
|
21
app/models/tag_trend.rb
Normal file
21
app/models/tag_trend.rb
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# == Schema Information
|
||||||
|
#
|
||||||
|
# Table name: tag_trends
|
||||||
|
#
|
||||||
|
# id :bigint(8) not null, primary key
|
||||||
|
# tag_id :bigint(8) not null
|
||||||
|
# score :float default(0.0), not null
|
||||||
|
# rank :integer default(0), not null
|
||||||
|
# allowed :boolean default(FALSE), not null
|
||||||
|
# language :string
|
||||||
|
#
|
||||||
|
class TagTrend < ApplicationRecord
|
||||||
|
include RankedTrend
|
||||||
|
|
||||||
|
belongs_to :tag
|
||||||
|
|
||||||
|
scope :allowed, -> { where(allowed: true) }
|
||||||
|
scope :not_allowed, -> { where(allowed: false) }
|
||||||
|
end
|
|
@ -37,18 +37,6 @@ class Trends::Base
|
||||||
Trends::Query.new(key_prefix, klass)
|
Trends::Query.new(key_prefix, klass)
|
||||||
end
|
end
|
||||||
|
|
||||||
def score(id, locale: nil)
|
|
||||||
redis.zscore([key_prefix, 'all', locale].compact.join(':'), id) || 0
|
|
||||||
end
|
|
||||||
|
|
||||||
def rank(id, locale: nil)
|
|
||||||
redis.zrevrank([key_prefix, 'allowed', locale].compact.join(':'), id)
|
|
||||||
end
|
|
||||||
|
|
||||||
def currently_trending_ids(allowed, limit)
|
|
||||||
redis.zrevrange(allowed ? "#{key_prefix}:allowed" : "#{key_prefix}:all", 0, limit.positive? ? limit - 1 : limit).map(&:to_i)
|
|
||||||
end
|
|
||||||
|
|
||||||
protected
|
protected
|
||||||
|
|
||||||
def key_prefix
|
def key_prefix
|
||||||
|
@ -64,42 +52,9 @@ class Trends::Base
|
||||||
redis.expire(used_key(at_time), 1.day.seconds)
|
redis.expire(used_key(at_time), 1.day.seconds)
|
||||||
end
|
end
|
||||||
|
|
||||||
def score_at_rank(rank)
|
|
||||||
redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0
|
|
||||||
end
|
|
||||||
|
|
||||||
def replace_items(suffix, items)
|
|
||||||
tmp_prefix = "#{key_prefix}:tmp:#{SecureRandom.alphanumeric(6)}#{suffix}"
|
|
||||||
allowed_items = filter_for_allowed_items(items)
|
|
||||||
|
|
||||||
redis.pipelined do |pipeline|
|
|
||||||
items.each { |item| pipeline.zadd("#{tmp_prefix}:all", item[:score], item[:item].id) }
|
|
||||||
allowed_items.each { |item| pipeline.zadd("#{tmp_prefix}:allowed", item[:score], item[:item].id) }
|
|
||||||
|
|
||||||
rename_set(pipeline, "#{tmp_prefix}:all", "#{key_prefix}:all#{suffix}", items)
|
|
||||||
rename_set(pipeline, "#{tmp_prefix}:allowed", "#{key_prefix}:allowed#{suffix}", allowed_items)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def filter_for_allowed_items(items)
|
|
||||||
raise NotImplementedError
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def used_key(at_time)
|
def used_key(at_time)
|
||||||
"#{key_prefix}:used:#{at_time.beginning_of_day.to_i}"
|
"#{key_prefix}:used:#{at_time.beginning_of_day.to_i}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def rename_set(pipeline, from_key, to_key, set_items)
|
|
||||||
if set_items.empty?
|
|
||||||
pipeline.del(to_key)
|
|
||||||
else
|
|
||||||
pipeline.rename(from_key, to_key)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def skip_review?
|
|
||||||
Setting.trendable_by_default
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,15 +1,13 @@
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
class Trends::Query
|
class Trends::Query
|
||||||
include Redisable
|
|
||||||
include Enumerable
|
include Enumerable
|
||||||
|
|
||||||
attr_reader :prefix, :klass, :loaded
|
attr_reader :klass, :loaded
|
||||||
|
|
||||||
alias loaded? loaded
|
alias loaded? loaded
|
||||||
|
|
||||||
def initialize(prefix, klass)
|
def initialize(_prefix, klass)
|
||||||
@prefix = prefix
|
|
||||||
@klass = klass
|
@klass = klass
|
||||||
@records = []
|
@records = []
|
||||||
@loaded = false
|
@loaded = false
|
||||||
|
@ -68,22 +66,11 @@ class Trends::Query
|
||||||
alias to_a to_ary
|
alias to_a to_ary
|
||||||
|
|
||||||
def to_arel
|
def to_arel
|
||||||
if ids_for_key.empty?
|
raise NotImplementedError
|
||||||
klass.none
|
|
||||||
else
|
|
||||||
scope = klass.joins(sanitized_join_sql).reorder('x.ordering')
|
|
||||||
scope = scope.offset(@offset) if @offset.present?
|
|
||||||
scope = scope.limit(@limit) if @limit.present?
|
|
||||||
scope
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def key
|
|
||||||
[@prefix, @allowed ? 'allowed' : 'all', @locale].compact.join(':')
|
|
||||||
end
|
|
||||||
|
|
||||||
def load
|
def load
|
||||||
unless loaded?
|
unless loaded?
|
||||||
@records = perform_queries
|
@records = perform_queries
|
||||||
|
@ -93,29 +80,7 @@ class Trends::Query
|
||||||
self
|
self
|
||||||
end
|
end
|
||||||
|
|
||||||
def ids_for_key
|
|
||||||
@ids_for_key ||= redis.zrevrange(key, 0, -1).map(&:to_i)
|
|
||||||
end
|
|
||||||
|
|
||||||
def sanitized_join_sql
|
|
||||||
ActiveRecord::Base.sanitize_sql_array(join_sql_array)
|
|
||||||
end
|
|
||||||
|
|
||||||
def join_sql_array
|
|
||||||
[join_sql_query, ids_for_key]
|
|
||||||
end
|
|
||||||
|
|
||||||
def join_sql_query
|
|
||||||
<<~SQL.squish
|
|
||||||
JOIN unnest(array[?]) WITH ordinality AS x (id, ordering) ON #{klass.table_name}.id = x.id
|
|
||||||
SQL
|
|
||||||
end
|
|
||||||
|
|
||||||
def perform_queries
|
def perform_queries
|
||||||
apply_scopes(to_arel).to_a
|
to_arel.to_a
|
||||||
end
|
|
||||||
|
|
||||||
def apply_scopes(scope)
|
|
||||||
scope
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -11,32 +11,91 @@ class Trends::Tags < Trends::Base
|
||||||
decay_threshold: 1,
|
decay_threshold: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class Query < Trends::Query
|
||||||
|
def filtered_for!(account)
|
||||||
|
@account = account
|
||||||
|
self
|
||||||
|
end
|
||||||
|
|
||||||
|
def filtered_for(account)
|
||||||
|
clone.filtered_for!(account)
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_arel
|
||||||
|
scope = Tag.joins(:trend).reorder(language_order_clause.desc, score: :desc)
|
||||||
|
scope = scope.merge(TagTrend.allowed) if @allowed
|
||||||
|
scope = scope.offset(@offset) if @offset.present?
|
||||||
|
scope = scope.limit(@limit) if @limit.present?
|
||||||
|
scope
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def language_order_clause
|
||||||
|
Arel::Nodes::Case.new.when(TagTrend.arel_table[:language].in(preferred_languages)).then(1).else(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
def preferred_languages
|
||||||
|
if @account&.chosen_languages.present?
|
||||||
|
@account.chosen_languages
|
||||||
|
else
|
||||||
|
@locale
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def register(status, at_time = Time.now.utc)
|
def register(status, at_time = Time.now.utc)
|
||||||
return unless !status.reblog? && status.public_visibility? && !status.account.silenced?
|
return unless !status.reblog? && status.public_visibility? && !status.account.silenced?
|
||||||
|
|
||||||
status.tags.each do |tag|
|
status.tags.each do |tag|
|
||||||
add(tag, status.account_id, at_time) if tag.usable?
|
add(tag, status.account_id, status.language, at_time) if tag.usable?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def add(tag, account_id, at_time = Time.now.utc)
|
def add(tag, account_id, language, at_time = Time.now.utc)
|
||||||
|
# This history is used for display purposes
|
||||||
tag.history.add(account_id, at_time)
|
tag.history.add(account_id, at_time)
|
||||||
record_used_id(tag.id, at_time)
|
|
||||||
|
# This history is used for calculating the score
|
||||||
|
tag.history(language).add(account_id, at_time)
|
||||||
|
|
||||||
|
# We need to save the language along with the hashtag so we know which languages to calculate for later
|
||||||
|
record_used_id("#{tag.id}:#{language}", at_time)
|
||||||
|
end
|
||||||
|
|
||||||
|
def query
|
||||||
|
Query.new(key_prefix, klass)
|
||||||
end
|
end
|
||||||
|
|
||||||
def refresh(at_time = Time.now.utc)
|
def refresh(at_time = Time.now.utc)
|
||||||
tags = Tag.where(id: (recently_used_ids(at_time) + currently_trending_ids(false, -1)).uniq)
|
# First, recalculate scores for links that were trending previously. We split the queries
|
||||||
calculate_scores(tags, at_time)
|
# to avoid having to load all of the IDs into Ruby just to send them back into Postgres
|
||||||
|
Tag.where(id: TagTrend.select(:tag_id)).find_in_batches(batch_size: BATCH_SIZE) do |tags|
|
||||||
|
calculate_scores(tags, at_time)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Then, calculate scores for links that were used today. There are potentially some
|
||||||
|
# duplicate items here that we might process one more time, but that should be fine
|
||||||
|
Tag.where(id: recently_used_ids(at_time).map { |str| str.split(':').first }.uniq).find_in_batches(batch_size: BATCH_SIZE) do |tags|
|
||||||
|
calculate_scores(tags, at_time)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Now that all trends have up-to-date scores, and all the ones below the threshold have
|
||||||
|
# been removed, we can recalculate their positions
|
||||||
|
TagTrend.recalculate_ordered_rank
|
||||||
end
|
end
|
||||||
|
|
||||||
def request_review
|
def request_review
|
||||||
tags = Tag.where(id: currently_trending_ids(false, -1))
|
score_at_threshold = TagTrend.allowed.by_rank.ranked_below(options[:review_threshold]).first&.score || 0
|
||||||
|
preview_card_trends = TagTrend.not_allowed.includes(:tag)
|
||||||
|
|
||||||
tags.filter_map do |tag|
|
preview_card_trends.filter_map do |trend|
|
||||||
next unless would_be_trending?(tag.id) && !tag.trendable? && tag.requires_review_notification?
|
tag = trend.tag
|
||||||
|
|
||||||
tag.touch(:requested_review_at)
|
if trend.score > score_at_threshold && !tag.trendable? && tag.requires_review_notification?
|
||||||
tag
|
tag.touch(:requested_review_at)
|
||||||
|
tag
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -53,9 +112,7 @@ class Trends::Tags < Trends::Base
|
||||||
private
|
private
|
||||||
|
|
||||||
def calculate_scores(tags, at_time)
|
def calculate_scores(tags, at_time)
|
||||||
items = []
|
items = tags.map do |tag|
|
||||||
|
|
||||||
tags.each do |tag|
|
|
||||||
expected = tag.history.get(at_time - 1.day).accounts.to_f
|
expected = tag.history.get(at_time - 1.day).accounts.to_f
|
||||||
expected = 1.0 if expected.zero?
|
expected = 1.0 if expected.zero?
|
||||||
observed = tag.history.get(at_time).accounts.to_f
|
observed = tag.history.get(at_time).accounts.to_f
|
||||||
|
@ -79,19 +136,13 @@ class Trends::Tags < Trends::Base
|
||||||
|
|
||||||
decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
|
decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
|
||||||
|
|
||||||
next unless decaying_score >= options[:decay_threshold]
|
[decaying_score, tag]
|
||||||
|
|
||||||
items << { score: decaying_score, item: tag }
|
|
||||||
end
|
end
|
||||||
|
|
||||||
replace_items('', items)
|
to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
|
||||||
end
|
to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
|
||||||
|
|
||||||
def filter_for_allowed_items(items)
|
TagTrend.upsert_all(to_insert.map { |(score, tag)| { tag_id: tag.id, score: score, languages: [], allowed: tag.trendable? || false } }, unique_by: :tag_id) if to_insert.any?
|
||||||
items.select { |item| item[:item].trendable? }
|
TagTrend.where(tag_id: to_delete.map { |(_, tag)| tag.id }).delete_all if to_delete.any?
|
||||||
end
|
|
||||||
|
|
||||||
def would_be_trending?(id)
|
|
||||||
score(id) > score_at_rank(options[:review_threshold] - 1)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
15
db/migrate/20241111141355_create_tag_trends.rb
Normal file
15
db/migrate/20241111141355_create_tag_trends.rb
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class CreateTagTrends < ActiveRecord::Migration[7.2]
|
||||||
|
def change
|
||||||
|
create_table :tag_trends do |t| # rubocop:disable Rails/CreateTableWithTimestamps
|
||||||
|
t.references :tag, null: false, foreign_key: { on_delete: :cascade }, index: false
|
||||||
|
t.float :score, null: false, default: 0
|
||||||
|
t.integer :rank, null: false, default: 0
|
||||||
|
t.boolean :allowed, null: false, default: false
|
||||||
|
t.string :language
|
||||||
|
end
|
||||||
|
|
||||||
|
add_index :tag_trends, [:tag_id, :language], unique: true
|
||||||
|
end
|
||||||
|
end
|
12
db/schema.rb
12
db/schema.rb
|
@ -10,7 +10,7 @@
|
||||||
#
|
#
|
||||||
# It's strongly recommended that you check this file into your version control system.
|
# It's strongly recommended that you check this file into your version control system.
|
||||||
|
|
||||||
ActiveRecord::Schema[7.2].define(version: 2024_11_04_082851) do
|
ActiveRecord::Schema[7.1].define(version: 2024_11_11_141355) do
|
||||||
# These are extensions that must be enabled in order to support this database
|
# These are extensions that must be enabled in order to support this database
|
||||||
enable_extension "plpgsql"
|
enable_extension "plpgsql"
|
||||||
|
|
||||||
|
@ -1080,6 +1080,15 @@ ActiveRecord::Schema[7.2].define(version: 2024_11_04_082851) do
|
||||||
t.index ["tag_id"], name: "index_tag_follows_on_tag_id"
|
t.index ["tag_id"], name: "index_tag_follows_on_tag_id"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
create_table "tag_trends", force: :cascade do |t|
|
||||||
|
t.bigint "tag_id", null: false
|
||||||
|
t.float "score", default: 0.0, null: false
|
||||||
|
t.integer "rank", default: 0, null: false
|
||||||
|
t.boolean "allowed", default: false, null: false
|
||||||
|
t.string "language"
|
||||||
|
t.index ["tag_id", "language"], name: "index_tag_trends_on_tag_id_and_language", unique: true
|
||||||
|
end
|
||||||
|
|
||||||
create_table "tags", force: :cascade do |t|
|
create_table "tags", force: :cascade do |t|
|
||||||
t.string "name", default: "", null: false
|
t.string "name", default: "", null: false
|
||||||
t.datetime "created_at", precision: nil, null: false
|
t.datetime "created_at", precision: nil, null: false
|
||||||
|
@ -1343,6 +1352,7 @@ ActiveRecord::Schema[7.2].define(version: 2024_11_04_082851) do
|
||||||
add_foreign_key "statuses_tags", "tags", name: "fk_3081861e21", on_delete: :cascade
|
add_foreign_key "statuses_tags", "tags", name: "fk_3081861e21", on_delete: :cascade
|
||||||
add_foreign_key "tag_follows", "accounts", on_delete: :cascade
|
add_foreign_key "tag_follows", "accounts", on_delete: :cascade
|
||||||
add_foreign_key "tag_follows", "tags", on_delete: :cascade
|
add_foreign_key "tag_follows", "tags", on_delete: :cascade
|
||||||
|
add_foreign_key "tag_trends", "tags", on_delete: :cascade
|
||||||
add_foreign_key "tombstones", "accounts", on_delete: :cascade
|
add_foreign_key "tombstones", "accounts", on_delete: :cascade
|
||||||
add_foreign_key "user_invite_requests", "users", on_delete: :cascade
|
add_foreign_key "user_invite_requests", "users", on_delete: :cascade
|
||||||
add_foreign_key "users", "accounts", name: "fk_50500f500d", on_delete: :cascade
|
add_foreign_key "users", "accounts", name: "fk_50500f500d", on_delete: :cascade
|
||||||
|
|
9
spec/fabricators/tag_trend_fabricator.rb
Normal file
9
spec/fabricators/tag_trend_fabricator.rb
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
Fabricator(:tag_trend) do
|
||||||
|
tag_id ''
|
||||||
|
score 1.5
|
||||||
|
rank 1
|
||||||
|
allowed false
|
||||||
|
language 'MyString'
|
||||||
|
end
|
7
spec/models/tag_trend_spec.rb
Normal file
7
spec/models/tag_trend_spec.rb
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'rails_helper'
|
||||||
|
|
||||||
|
RSpec.describe TagTrend do
|
||||||
|
pending "add some examples to (or delete) #{__FILE__}"
|
||||||
|
end
|
Loading…
Reference in a new issue