Ensure that Webhook tasks are not double delivered (#21558)

When re-retrieving hook tasks from the DB double check if they have not
been delivered in the meantime. Further ensure that tasks are marked as
delivered when they are being delivered.

In addition:
* Improve the error reporting and make sure that the webhook task
population script runs in a separate goroutine.
* Only get hook task IDs out of the DB instead of the whole task when
repopulating the queue
* When repopulating the queue make the DB request paged

Ref #17940 

Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: delvh <dev.lh@web.de>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
This commit is contained in:
zeripath 2022-11-23 14:10:04 +00:00 committed by GitHub
parent 4c00d8f916
commit 787f6c3227
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 33 deletions

View file

@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
return newTask, db.Insert(ctx, newTask) return newTask, db.Insert(ctx, newTask)
} }
// FindUndeliveredHookTasks represents find the undelivered hook tasks // FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) { func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
tasks := make([]*HookTask, 0, 10) const batchSize = 100
tasks := make([]int64, 0, batchSize)
return tasks, db.GetEngine(ctx). return tasks, db.GetEngine(ctx).
Select("id").
Table(new(HookTask)).
Where("is_delivered=?", false). Where("is_delivered=?", false).
And("id > ?", lowerID).
Asc("id").
Limit(batchSize).
Find(&tasks) Find(&tasks)
} }
func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{
ID: task.ID,
IsDelivered: true,
})
return count != 0, err
}
// CleanupHookTaskTable deletes rows from hook_task as needed. // CleanupHookTaskTable deletes rows from hook_task as needed.
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error { func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
log.Trace("Doing: CleanupHookTaskTable") log.Trace("Doing: CleanupHookTaskTable")

View file

@ -23,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/hostmatcher" "code.gitea.io/gitea/modules/hostmatcher"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
return return
} }
// There was a panic whilst delivering a hook... // There was a panic whilst delivering a hook...
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
}() }()
t.IsDelivered = true t.IsDelivered = true
@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
switch w.HTTPMethod { switch w.HTTPMethod {
case "": case "":
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID) log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
fallthrough fallthrough
case http.MethodPost: case http.MethodPost:
switch w.ContentType { switch w.ContentType {
@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
case http.MethodGet: case http.MethodGet:
u, err := url.Parse(w.URL) u, err := url.Parse(w.URL)
if err != nil { if err != nil {
return err return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
} }
vals := u.Query() vals := u.Query()
vals["payload"] = []string{t.PayloadContent} vals["payload"] = []string{t.PayloadContent}
u.RawQuery = vals.Encode() u.RawQuery = vals.Encode()
req, err = http.NewRequest("GET", u.String(), nil) req, err = http.NewRequest("GET", u.String(), nil)
if err != nil { if err != nil {
return err return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
} }
case http.MethodPut: case http.MethodPut:
switch w.Type { switch w.Type {
@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID)) url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID))
req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent)) req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent))
if err != nil { if err != nil {
return err return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
} }
default: default:
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
} }
default: default:
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
} }
var signatureSHA1 string var signatureSHA1 string
@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
Headers: map[string]string{}, Headers: map[string]string{},
} }
// OK We're now ready to attempt to deliver the task - we must double check that it
// has not been delivered in the meantime
updated, err := webhook_model.MarkTaskDelivered(ctx, t)
if err != nil {
log.Error("MarkTaskDelivered[%d]: %v", t.ID, err)
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
}
if !updated {
// This webhook task has already been attempted to be delivered or is in the process of being delivered
log.Trace("Webhook Task[%d] already delivered", t.ID)
return nil
}
// All code from this point will update the hook task
defer func() { defer func() {
t.Delivered = time.Now().UnixNano() t.Delivered = time.Now().UnixNano()
if t.IsSucceed { if t.IsSucceed {
@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
} }
if !w.IsActive { if !w.IsActive {
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
return nil return nil
} }
resp, err := webhookHTTPClient.Do(req.WithContext(ctx)) resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
if err != nil { if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
return err return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
p, err := io.ReadAll(resp.Body) p, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err) t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
return err return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
} }
t.ResponseInfo.Body = string(p) t.ResponseInfo.Body = string(p)
return nil return nil
@ -272,17 +288,37 @@ func Init() error {
} }
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)
if err != nil {
log.Error("FindUndeliveredHookTasks failed: %v", err)
return err
}
for _, task := range tasks {
if err := enqueueHookTask(task); err != nil {
log.Error("enqueueHookTask failed: %v", err)
}
}
return nil return nil
} }
func populateWebhookSendingQueue(ctx context.Context) {
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
defer finished()
lowerID := int64(0)
for {
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
if err != nil {
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
return
}
if len(taskIDs) == 0 {
return
}
lowerID = taskIDs[len(taskIDs)-1]
for _, taskID := range taskIDs {
select {
case <-ctx.Done():
log.Warn("Shutdown before Webhook Sending queue finishing being populated")
return
default:
}
if err := enqueueHookTask(taskID); err != nil {
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
}
}
}
}

View file

@ -16,6 +16,7 @@ import (
"code.gitea.io/gitea/models/unittest" "code.gitea.io/gitea/models/unittest"
webhook_model "code.gitea.io/gitea/models/webhook" webhook_model "code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) {
err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken") err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken")
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook)) assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook))
db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true)
hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush} hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}}
hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask)
assert.NoError(t, err)
if !assert.NotNil(t, hookTask) {
return
}
assert.NoError(t, Deliver(context.Background(), hookTask)) assert.NoError(t, Deliver(context.Background(), hookTask))
select { select {

View file

@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
for _, taskID := range data { for _, taskID := range data {
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64)) task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
if err != nil { if err != nil {
log.Error("GetHookTaskByID failed: %v", err) log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
} else { continue
if err := Deliver(ctx, task); err != nil { }
log.Error("webhook.Deliver failed: %v", err)
} if task.IsDelivered {
// Already delivered in the meantime
log.Trace("Task[%d] has already been delivered", task.ID)
continue
}
if err := Deliver(ctx, task); err != nil {
log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
} }
} }
return nil return nil
} }
func enqueueHookTask(task *webhook_model.HookTask) error { func enqueueHookTask(taskID int64) error {
err := hookQueue.PushFunc(task.ID, nil) err := hookQueue.Push(taskID)
if err != nil && err != queue.ErrAlreadyInQueue { if err != nil && err != queue.ErrAlreadyInQueue {
return err return err
} }
@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
return fmt.Errorf("CreateHookTask: %w", err) return fmt.Errorf("CreateHookTask: %w", err)
} }
return enqueueHookTask(task) return enqueueHookTask(task.ID)
} }
// PrepareWebhooks adds new webhooks to task queue for given payload. // PrepareWebhooks adds new webhooks to task queue for given payload.
@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
return err return err
} }
return enqueueHookTask(task) return enqueueHookTask(task.ID)
} }