forgejo/modules/eventsource/manager_run.go
Gusted e4eb82b738
fix: use better code to group UID and stopwatches
- Instead of having code that relied on the result being sorted (which
wasn't specified in the query and therefore not safe to assume so). Use
a map where it doesn't care if the result that we get from the database
is sorted or not.
- Added unit test.
2024-11-16 15:59:02 +01:00

116 lines
2.9 KiB
Go

// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"context"
"time"
activities_model "code.gitea.io/gitea/models/activities"
issues_model "code.gitea.io/gitea/models/issues"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/convert"
)
// Init starts this eventsource
func (m *Manager) Init() {
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
go graceful.GetManager().RunWithShutdownContext(m.Run)
}
// Run runs the manager within a provided context
func (m *Manager) Run(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true)
defer finished()
then := timeutil.TimeStampNow().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
loop:
for {
select {
case <-ctx.Done():
timer.Stop()
break loop
case <-timer.C:
m.mutex.Lock()
connectionCount := len(m.messengers)
if connectionCount == 0 {
log.Trace("Event source has no listeners")
// empty the connection channel
select {
case <-m.connection:
default:
}
}
m.mutex.Unlock()
if connectionCount == 0 {
// No listeners so the source can be paused
log.Trace("Pausing the eventsource")
select {
case <-ctx.Done():
break loop
case <-m.connection:
log.Trace("Connection detected - restarting the eventsource")
// OK we're back so lets reset the timer and start again
// We won't change the "then" time because there could be concurrency issues
select {
case <-timer.C:
default:
}
continue
}
}
now := timeutil.TimeStampNow().Add(-2)
uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
if err != nil {
log.Error("Unable to get UIDcounts: %v", err)
}
for _, uidCount := range uidCounts {
m.SendMessage(uidCount.UserID, &Event{
Name: "notification-count",
Data: uidCount,
})
}
then = now
if setting.Service.EnableTimetracking {
usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
if err != nil {
log.Error("Unable to get GetUIDsAndStopwatch: %v", err)
return
}
for uid, stopwatches := range usersStopwatches {
apiSWs, err := convert.ToStopWatches(ctx, stopwatches)
if err != nil {
if !issues_model.IsErrIssueNotExist(err) {
log.Error("Unable to APIFormat stopwatches: %v", err)
}
continue
}
dataBs, err := json.Marshal(apiSWs)
if err != nil {
log.Error("Unable to marshal stopwatches: %v", err)
continue
}
m.SendMessage(uid, &Event{
Name: "stopwatches",
Data: string(dataBs),
})
}
}
}
}
m.UnregisterAll()
}