From 6c4c84b161947cb11ad0451a39e26b25be4c93d5 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 8 Mar 2016 20:16:11 +0100 Subject: [PATCH] Distrubute statuses as a fan-out-on-write system, with optional precomputing --- Gemfile | 1 + Gemfile.lock | 2 + app/controllers/api/statuses_controller.rb | 6 ++- app/controllers/home_controller.rb | 3 +- app/models/feed.rb | 27 +++++++++++++ app/models/status.rb | 1 + app/services/fan_out_on_write_service.rb | 46 ++++++++++++++++++++++ app/services/precompute_feed_service.rb | 35 ++++++++++++++++ config/initializers/redis.rb | 1 + 9 files changed, 119 insertions(+), 3 deletions(-) create mode 100644 app/models/feed.rb create mode 100644 app/services/fan_out_on_write_service.rb create mode 100644 app/services/precompute_feed_service.rb create mode 100644 config/initializers/redis.rb diff --git a/Gemfile b/Gemfile index 7b37ec29c5..49ec1888e6 100644 --- a/Gemfile +++ b/Gemfile @@ -30,6 +30,7 @@ gem 'rails_autolink' gem 'doorkeeper' gem 'rabl' gem 'oj' +gem 'redis', '~>3.2' group :development, :test do gem 'rspec-rails' diff --git a/Gemfile.lock b/Gemfile.lock index a05fad7f07..6550ff101d 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -208,6 +208,7 @@ GEM rake (10.5.0) rdoc (4.2.2) json (~> 1.4) + redis (3.2.2) ref (2.0.0) responders (2.1.1) railties (>= 4.2.0, < 5.1) @@ -328,6 +329,7 @@ DEPENDENCIES rails (= 4.2.5.1) rails_12factor rails_autolink + redis (~> 3.2) rspec-rails rubocop sass-rails (~> 5.0) diff --git a/app/controllers/api/statuses_controller.rb b/app/controllers/api/statuses_controller.rb index 82334a32f2..04128537aa 100644 --- a/app/controllers/api/statuses_controller.rb +++ b/app/controllers/api/statuses_controller.rb @@ -22,10 +22,12 @@ class Api::StatusesController < ApiController end def home - @statuses = Status.where(account: [current_user.account] + current_user.account.following).order('created_at desc') + feed = Feed.new(:home, current_user.account) + @statuses = feed.get(20, (params[:offset] || 0).to_i) end def mentions - @statuses = Status.where(id: Mention.where(account: current_user.account).pluck(:status_id)).order('created_at desc') + feed = Feed.new(:mentions, current_user.account) + @statuses = feed.get(20, (params[:offset] || 0).to_i) end end diff --git a/app/controllers/home_controller.rb b/app/controllers/home_controller.rb index 3a3d0ade40..294749a22f 100644 --- a/app/controllers/home_controller.rb +++ b/app/controllers/home_controller.rb @@ -2,6 +2,7 @@ class HomeController < ApplicationController before_action :authenticate_user! def index - @statuses = Status.where(account: ([current_user.account] + current_user.account.following)).where('reblog_of_id IS NULL OR account_id != ?', current_user.account.id).order('created_at desc') + feed = Feed.new(:home, current_user.account) + @statuses = feed.get(20, (params[:offset] || 0).to_i) end end diff --git a/app/models/feed.rb b/app/models/feed.rb new file mode 100644 index 0000000000..a063ad05b7 --- /dev/null +++ b/app/models/feed.rb @@ -0,0 +1,27 @@ +class Feed + def initialize(type, account) + @type = type + @account = account + end + + def get(limit, offset = 0) + unhydrated = redis.zrevrange(key, offset, limit) + status_map = Hash.new + + # If we're after most recent items and none are there, we need to precompute the feed + return PrecomputeFeedService.new.(@type, @account).take(limit) if unhydrated.empty? && offset == 0 + + Status.where(id: unhydrated).each { |status| status_map[status.id.to_s] = status } + return unhydrated.map { |id| status_map[id] } + end + + private + + def key + "feed:#{@type}:#{@account.id}" + end + + def redis + $redis + end +end diff --git a/app/models/status.rb b/app/models/status.rb index a346ac9b07..7e0c334ec1 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -67,5 +67,6 @@ class Status < ActiveRecord::Base after_create do self.account.stream_entries.create!(activity: self) + FanOutOnWriteService.new.(self) end end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb new file mode 100644 index 0000000000..87a7c55ac8 --- /dev/null +++ b/app/services/fan_out_on_write_service.rb @@ -0,0 +1,46 @@ +class FanOutOnWriteService < BaseService + MAX_FEED_SIZE = 800 + + # Push a status into home and mentions feeds + # @param [Status] status + def call(status) + replied_to_user = status.reply? ? status.thread.account : nil + + # Deliver to local self + push(:home, status.account.id, status) if status.account.local? + + # Deliver to local followers + status.account.followers.each do |follower| + next if (status.reply? && !follower.following?(replied_to_user)) || !follower.local? + push(:home, follower.id, status) + end + + # Deliver to local mentioned + status.mentions.each do |mentioned_account| + next unless mentioned_account.local? + push(:mentions, mentioned_account.id, status) + end + end + + private + + def push(type, receiver_id, status) + redis.zadd(key(type, receiver_id), status.created_at.to_i, status.id) + trim(type, receiver_id) + end + + def trim(type, receiver_id) + return unless redis.zcard(key(type, receiver_id)) > MAX_FEED_SIZE + + last = redis.zrevrange(key(type, receiver_id), MAX_FEED_SIZE - 1, MAX_FEED_SIZE - 1) + redis.zremrangebyscore(key(type, receiver_id), '-inf', "(#{last.last}") + end + + def key(type, id) + "feed:#{type}:#{id}" + end + + def redis + $redis + end +end diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb new file mode 100644 index 0000000000..89b0344040 --- /dev/null +++ b/app/services/precompute_feed_service.rb @@ -0,0 +1,35 @@ +class PrecomputeFeedService < BaseService + MAX_FEED_SIZE = 800 + + # Fill up a user's home/mentions feed from DB and return it + # @param [Symbol] type :home or :mentions + # @param [Account] account + # @return [Array] + def call(type, account) + statuses = send(type.to_s, account).order('created_at desc').limit(MAX_FEED_SIZE) + statuses.each { |status| push(type, account.id, status) } + statuses + end + + private + + def push(type, receiver_id, status) + redis.zadd(key(type, receiver_id), status.created_at.to_i, status.id) + end + + def home(account) + Status.where(account: [account] + account.following) + end + + def mentions(account) + Status.where(id: Mention.where(account: account).pluck(:status_id)) + end + + def key(type, id) + "feed:#{type}:#{id}" + end + + def redis + $redis + end +end diff --git a/config/initializers/redis.rb b/config/initializers/redis.rb new file mode 100644 index 0000000000..14f3778c41 --- /dev/null +++ b/config/initializers/redis.rb @@ -0,0 +1 @@ +$redis = Redis.new(host: ENV['REDIS_HOST'] || 'localhost', port: ENV['REDIS_PORT'] || 6379)