mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-12-28 18:40:21 +00:00
Support elastic search for code search (#10273)
* Support elastic search for code search * Finished elastic search implementation and add some tests * Enable test on drone and added docs * Add new fields to elastic search * Fix bug * remove unused changes * Use indexer alias to keep the gitea indexer version * Improve codes * Some code improvements * The real indexer name changed to xxx.v1 Co-authored-by: zeripath <art27@cantab.net>
This commit is contained in:
parent
d257485bc0
commit
9bc69ff26e
|
@ -209,6 +209,7 @@ steps:
|
||||||
TAGS: bindata
|
TAGS: bindata
|
||||||
TEST_LDAP: 1
|
TEST_LDAP: 1
|
||||||
USE_REPO_TEST_DIR: 1
|
USE_REPO_TEST_DIR: 1
|
||||||
|
TEST_INDEXER_CODE_ES_URL: "http://elastic:changeme@elasticsearch:9200"
|
||||||
depends_on:
|
depends_on:
|
||||||
- build
|
- build
|
||||||
|
|
||||||
|
|
|
@ -428,7 +428,15 @@ STARTUP_TIMEOUT=30s
|
||||||
|
|
||||||
; repo indexer by default disabled, since it uses a lot of disk space
|
; repo indexer by default disabled, since it uses a lot of disk space
|
||||||
REPO_INDEXER_ENABLED = false
|
REPO_INDEXER_ENABLED = false
|
||||||
|
; Code search engine type, could be `bleve` or `elasticsearch`.
|
||||||
|
REPO_INDEXER_TYPE = bleve
|
||||||
|
; Index file used for code search.
|
||||||
REPO_INDEXER_PATH = indexers/repos.bleve
|
REPO_INDEXER_PATH = indexers/repos.bleve
|
||||||
|
; Code indexer connection string, available when `REPO_INDEXER_TYPE` is elasticsearch. i.e. http://elastic:changeme@localhost:9200
|
||||||
|
REPO_INDEXER_CONN_STR =
|
||||||
|
; Code indexer name, available when `REPO_INDEXER_TYPE` is elasticsearch
|
||||||
|
REPO_INDEXER_NAME = gitea_codes
|
||||||
|
|
||||||
UPDATE_BUFFER_LEN = 20
|
UPDATE_BUFFER_LEN = 20
|
||||||
MAX_FILE_SIZE = 1048576
|
MAX_FILE_SIZE = 1048576
|
||||||
; A comma separated list of glob patterns (see https://github.com/gobwas/glob) to include
|
; A comma separated list of glob patterns (see https://github.com/gobwas/glob) to include
|
||||||
|
|
|
@ -270,7 +270,11 @@ relation to port exhaustion.
|
||||||
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: Batch queue number.
|
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: Batch queue number.
|
||||||
|
|
||||||
- `REPO_INDEXER_ENABLED`: **false**: Enables code search (uses a lot of disk space, about 6 times more than the repository size).
|
- `REPO_INDEXER_ENABLED`: **false**: Enables code search (uses a lot of disk space, about 6 times more than the repository size).
|
||||||
|
- `REPO_INDEXER_TYPE`: **bleve**: Code search engine type, could be `bleve` or `elasticsearch`.
|
||||||
- `REPO_INDEXER_PATH`: **indexers/repos.bleve**: Index file used for code search.
|
- `REPO_INDEXER_PATH`: **indexers/repos.bleve**: Index file used for code search.
|
||||||
|
- `REPO_INDEXER_CONN_STR`: ****: Code indexer connection string, available when `REPO_INDEXER_TYPE` is elasticsearch. i.e. http://elastic:changeme@localhost:9200
|
||||||
|
- `REPO_INDEXER_NAME`: **gitea_codes**: Code indexer name, available when `REPO_INDEXER_TYPE` is elasticsearch
|
||||||
|
|
||||||
- `REPO_INDEXER_INCLUDE`: **empty**: A comma separated list of glob patterns (see https://github.com/gobwas/glob) to **include** in the index. Use `**.txt` to match any files with .txt extension. An empty list means include all files.
|
- `REPO_INDEXER_INCLUDE`: **empty**: A comma separated list of glob patterns (see https://github.com/gobwas/glob) to **include** in the index. Use `**.txt` to match any files with .txt extension. An empty list means include all files.
|
||||||
- `REPO_INDEXER_EXCLUDE`: **empty**: A comma separated list of glob patterns (see https://github.com/gobwas/glob) to **exclude** from the index. Files that match this list will not be indexed, even if they match in `REPO_INDEXER_INCLUDE`.
|
- `REPO_INDEXER_EXCLUDE`: **empty**: A comma separated list of glob patterns (see https://github.com/gobwas/glob) to **exclude** from the index. Files that match this list will not be indexed, even if they match in `REPO_INDEXER_INCLUDE`.
|
||||||
- `REPO_INDEXER_EXCLUDE_VENDORED`: **true**: Exclude vendored files from index.
|
- `REPO_INDEXER_EXCLUDE_VENDORED`: **true**: Exclude vendored files from index.
|
||||||
|
|
|
@ -98,8 +98,12 @@ menu:
|
||||||
- `ISSUE_INDEXER_QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: 当 `ISSUE_INDEXER_QUEUE_TYPE` 为 `redis` 时,保存Redis队列的连接字符串。
|
- `ISSUE_INDEXER_QUEUE_CONN_STR`: **addrs=127.0.0.1:6379 db=0**: 当 `ISSUE_INDEXER_QUEUE_TYPE` 为 `redis` 时,保存Redis队列的连接字符串。
|
||||||
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: 队列处理中批量提交数量。
|
- `ISSUE_INDEXER_QUEUE_BATCH_NUMBER`: **20**: 队列处理中批量提交数量。
|
||||||
|
|
||||||
- `REPO_INDEXER_ENABLED`: **false**: 是否启用代码搜索(启用后会占用比较大的磁盘空间)。
|
- `REPO_INDEXER_ENABLED`: **false**: 是否启用代码搜索(启用后会占用比较大的磁盘空间,如果是bleve可能需要占用约6倍存储空间)。
|
||||||
|
- `REPO_INDEXER_TYPE`: **bleve**: 代码搜索引擎类型,可以为 `bleve` 或者 `elasticsearch`。
|
||||||
- `REPO_INDEXER_PATH`: **indexers/repos.bleve**: 用于代码搜索的索引文件路径。
|
- `REPO_INDEXER_PATH`: **indexers/repos.bleve**: 用于代码搜索的索引文件路径。
|
||||||
|
- `REPO_INDEXER_CONN_STR`: ****: 代码搜索引擎连接字符串,当 `REPO_INDEXER_TYPE` 为 `elasticsearch` 时有效。例如: http://elastic:changeme@localhost:9200
|
||||||
|
- `REPO_INDEXER_NAME`: **gitea_codes**: 代码搜索引擎的名字,当 `REPO_INDEXER_TYPE` 为 `elasticsearch` 时有效。
|
||||||
|
|
||||||
- `UPDATE_BUFFER_LEN`: **20**: 代码索引请求的缓冲区长度。
|
- `UPDATE_BUFFER_LEN`: **20**: 代码索引请求的缓冲区长度。
|
||||||
- `MAX_FILE_SIZE`: **1048576**: 进行解析的源代码文件的最大长度,小于该值时才会索引。
|
- `MAX_FILE_SIZE`: **1048576**: 进行解析的源代码文件的最大长度,小于该值时才会索引。
|
||||||
|
|
||||||
|
|
|
@ -58,10 +58,10 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// openIndexer open the index at the specified path, checking for metadata
|
// openBleveIndexer open the index at the specified path, checking for metadata
|
||||||
// updates and bleve version updates. If index needs to be created (or
|
// updates and bleve version updates. If index needs to be created (or
|
||||||
// re-created), returns (nil, nil)
|
// re-created), returns (nil, nil)
|
||||||
func openIndexer(path string, latestVersion int) (bleve.Index, error) {
|
func openBleveIndexer(path string, latestVersion int) (bleve.Index, error) {
|
||||||
_, err := os.Stat(path)
|
_, err := os.Stat(path)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -104,54 +104,14 @@ func (d *RepoIndexerData) Type() string {
|
||||||
return repoIndexerDocType
|
return repoIndexerDocType
|
||||||
}
|
}
|
||||||
|
|
||||||
func addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
|
|
||||||
// Ignore vendored files in code search
|
|
||||||
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
|
|
||||||
RunInDir(repo.RepoPath())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
|
|
||||||
return fmt.Errorf("Misformatted git cat-file output: %v", err)
|
|
||||||
} else if int64(size) > setting.Indexer.MaxIndexerFileSize {
|
|
||||||
return addDelete(update.Filename, repo, batch)
|
|
||||||
}
|
|
||||||
|
|
||||||
fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
|
|
||||||
RunInDirBytes(repo.RepoPath())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else if !base.IsTextFile(fileContents) {
|
|
||||||
// FIXME: UTF-16 files will probably fail here
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
id := filenameIndexerID(repo.ID, update.Filename)
|
|
||||||
return batch.Index(id, &RepoIndexerData{
|
|
||||||
RepoID: repo.ID,
|
|
||||||
CommitID: commitSha,
|
|
||||||
Content: string(charset.ToUTF8DropErrors(fileContents)),
|
|
||||||
Language: analyze.GetCodeLanguage(update.Filename, fileContents),
|
|
||||||
UpdatedAt: time.Now().UTC(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
|
|
||||||
id := filenameIndexerID(repo.ID, filename)
|
|
||||||
return batch.Delete(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
repoIndexerAnalyzer = "repoIndexerAnalyzer"
|
repoIndexerAnalyzer = "repoIndexerAnalyzer"
|
||||||
repoIndexerDocType = "repoIndexerDocType"
|
repoIndexerDocType = "repoIndexerDocType"
|
||||||
repoIndexerLatestVersion = 5
|
repoIndexerLatestVersion = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
// createRepoIndexer create a repo indexer if one does not already exist
|
// createBleveIndexer create a bleve repo indexer if one does not already exist
|
||||||
func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) {
|
func createBleveIndexer(path string, latestVersion int) (bleve.Index, error) {
|
||||||
docMapping := bleve.NewDocumentMapping()
|
docMapping := bleve.NewDocumentMapping()
|
||||||
numericFieldMapping := bleve.NewNumericFieldMapping()
|
numericFieldMapping := bleve.NewNumericFieldMapping()
|
||||||
numericFieldMapping.IncludeInAll = false
|
numericFieldMapping.IncludeInAll = false
|
||||||
|
@ -199,18 +159,6 @@ func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) {
|
||||||
return indexer, nil
|
return indexer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func filenameIndexerID(repoID int64, filename string) string {
|
|
||||||
return indexerID(repoID) + "_" + filename
|
|
||||||
}
|
|
||||||
|
|
||||||
func filenameOfIndexerID(indexerID string) string {
|
|
||||||
index := strings.IndexByte(indexerID, '_')
|
|
||||||
if index == -1 {
|
|
||||||
log.Error("Unexpected ID in repo indexer: %s", indexerID)
|
|
||||||
}
|
|
||||||
return indexerID[index+1:]
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ Indexer = &BleveIndexer{}
|
_ Indexer = &BleveIndexer{}
|
||||||
)
|
)
|
||||||
|
@ -230,10 +178,51 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
|
||||||
return indexer, created, err
|
return indexer, created, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
|
||||||
|
// Ignore vendored files in code search
|
||||||
|
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
|
||||||
|
RunInDir(repo.RepoPath())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
|
||||||
|
return fmt.Errorf("Misformatted git cat-file output: %v", err)
|
||||||
|
} else if int64(size) > setting.Indexer.MaxIndexerFileSize {
|
||||||
|
return b.addDelete(update.Filename, repo, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
|
||||||
|
RunInDirBytes(repo.RepoPath())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !base.IsTextFile(fileContents) {
|
||||||
|
// FIXME: UTF-16 files will probably fail here
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
id := filenameIndexerID(repo.ID, update.Filename)
|
||||||
|
return batch.Index(id, &RepoIndexerData{
|
||||||
|
RepoID: repo.ID,
|
||||||
|
CommitID: commitSha,
|
||||||
|
Content: string(charset.ToUTF8DropErrors(fileContents)),
|
||||||
|
Language: analyze.GetCodeLanguage(update.Filename, fileContents),
|
||||||
|
UpdatedAt: time.Now().UTC(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
|
||||||
|
id := filenameIndexerID(repo.ID, filename)
|
||||||
|
return batch.Delete(id)
|
||||||
|
}
|
||||||
|
|
||||||
// init init the indexer
|
// init init the indexer
|
||||||
func (b *BleveIndexer) init() (bool, error) {
|
func (b *BleveIndexer) init() (bool, error) {
|
||||||
var err error
|
var err error
|
||||||
b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion)
|
b.indexer, err = openBleveIndexer(b.indexDir, repoIndexerLatestVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -241,7 +230,7 @@ func (b *BleveIndexer) init() (bool, error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion)
|
b.indexer, err = createBleveIndexer(b.indexDir, repoIndexerLatestVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -262,38 +251,19 @@ func (b *BleveIndexer) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index indexes the data
|
// Index indexes the data
|
||||||
func (b *BleveIndexer) Index(repoID int64) error {
|
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
|
||||||
repo, err := models.GetRepositoryByID(repoID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
sha, err := getDefaultBranchSha(repo)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
changes, err := getRepoChanges(repo, sha)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else if changes == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
|
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
|
||||||
for _, update := range changes.Updates {
|
for _, update := range changes.Updates {
|
||||||
if err := addUpdate(sha, update, repo, batch); err != nil {
|
if err := b.addUpdate(sha, update, repo, batch); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, filename := range changes.RemovedFilenames {
|
for _, filename := range changes.RemovedFilenames {
|
||||||
if err := addDelete(filename, repo, batch); err != nil {
|
if err := b.addDelete(filename, repo, batch); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = batch.Flush(); err != nil {
|
return batch.Flush()
|
||||||
return err
|
|
||||||
}
|
|
||||||
return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes indexes by ids
|
// Delete deletes indexes by ids
|
||||||
|
|
|
@ -6,21 +6,15 @@ package code
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"code.gitea.io/gitea/models"
|
"code.gitea.io/gitea/models"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
|
||||||
"code.gitea.io/gitea/modules/util"
|
"code.gitea.io/gitea/modules/util"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestBleveIndexAndSearch(t *testing.T) {
|
||||||
models.MainTest(m, filepath.Join("..", "..", ".."))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIndexAndSearch(t *testing.T) {
|
|
||||||
models.PrepareTestEnv(t)
|
models.PrepareTestEnv(t)
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "bleve.index")
|
dir, err := ioutil.TempDir("", "bleve.index")
|
||||||
|
@ -31,10 +25,9 @@ func TestIndexAndSearch(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer util.RemoveAll(dir)
|
defer util.RemoveAll(dir)
|
||||||
|
|
||||||
setting.Indexer.RepoIndexerEnabled = true
|
|
||||||
idx, _, err := NewBleveIndexer(dir)
|
idx, _, err := NewBleveIndexer(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
assert.Fail(t, "Unable to create indexer Error: %v", err)
|
assert.Fail(t, "Unable to create bleve indexer Error: %v", err)
|
||||||
if idx != nil {
|
if idx != nil {
|
||||||
idx.Close()
|
idx.Close()
|
||||||
}
|
}
|
||||||
|
@ -42,45 +35,5 @@ func TestIndexAndSearch(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer idx.Close()
|
defer idx.Close()
|
||||||
|
|
||||||
err = idx.Index(1)
|
testIndexer("beleve", t, idx)
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
var (
|
|
||||||
keywords = []struct {
|
|
||||||
Keyword string
|
|
||||||
IDs []int64
|
|
||||||
Langs int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
Keyword: "Description",
|
|
||||||
IDs: []int64{1},
|
|
||||||
Langs: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Keyword: "repo1",
|
|
||||||
IDs: []int64{1},
|
|
||||||
Langs: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Keyword: "non-exist",
|
|
||||||
IDs: []int64{},
|
|
||||||
Langs: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, kw := range keywords {
|
|
||||||
total, res, langs, err := idx.Search(nil, "", kw.Keyword, 1, 10)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, len(kw.IDs), total)
|
|
||||||
|
|
||||||
assert.NotNil(t, langs)
|
|
||||||
assert.Len(t, langs, kw.Langs)
|
|
||||||
|
|
||||||
var ids = make([]int64, 0, len(res))
|
|
||||||
for _, hit := range res {
|
|
||||||
ids = append(ids, hit.RepoID)
|
|
||||||
}
|
|
||||||
assert.EqualValues(t, kw.IDs, ids)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
385
modules/indexer/code/elastic_search.go
Normal file
385
modules/indexer/code/elastic_search.go
Normal file
|
@ -0,0 +1,385 @@
|
||||||
|
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package code
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models"
|
||||||
|
"code.gitea.io/gitea/modules/analyze"
|
||||||
|
"code.gitea.io/gitea/modules/base"
|
||||||
|
"code.gitea.io/gitea/modules/charset"
|
||||||
|
"code.gitea.io/gitea/modules/git"
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
"code.gitea.io/gitea/modules/timeutil"
|
||||||
|
|
||||||
|
"github.com/go-enry/go-enry/v2"
|
||||||
|
"github.com/olivere/elastic/v7"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
esRepoIndexerLatestVersion = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ Indexer = &ElasticSearchIndexer{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// ElasticSearchIndexer implements Indexer interface
|
||||||
|
type ElasticSearchIndexer struct {
|
||||||
|
client *elastic.Client
|
||||||
|
indexerAliasName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type elasticLogger struct {
|
||||||
|
*log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l elasticLogger) Printf(format string, args ...interface{}) {
|
||||||
|
_ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewElasticSearchIndexer creates a new elasticsearch indexer
|
||||||
|
func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bool, error) {
|
||||||
|
opts := []elastic.ClientOptionFunc{
|
||||||
|
elastic.SetURL(url),
|
||||||
|
elastic.SetSniff(false),
|
||||||
|
elastic.SetHealthcheckInterval(10 * time.Second),
|
||||||
|
elastic.SetGzip(false),
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := elasticLogger{log.GetLogger(log.DEFAULT)}
|
||||||
|
|
||||||
|
if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
|
||||||
|
opts = append(opts, elastic.SetTraceLog(logger))
|
||||||
|
} else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
|
||||||
|
opts = append(opts, elastic.SetErrorLog(logger))
|
||||||
|
} else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
|
||||||
|
opts = append(opts, elastic.SetInfoLog(logger))
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := elastic.NewClient(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
indexer := &ElasticSearchIndexer{
|
||||||
|
client: client,
|
||||||
|
indexerAliasName: indexerName,
|
||||||
|
}
|
||||||
|
exists, err := indexer.init()
|
||||||
|
|
||||||
|
return indexer, !exists, err
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMapping = `{
|
||||||
|
"mappings": {
|
||||||
|
"properties": {
|
||||||
|
"repo_id": {
|
||||||
|
"type": "long",
|
||||||
|
"index": true
|
||||||
|
},
|
||||||
|
"content": {
|
||||||
|
"type": "text",
|
||||||
|
"index": true
|
||||||
|
},
|
||||||
|
"commit_id": {
|
||||||
|
"type": "keyword",
|
||||||
|
"index": true
|
||||||
|
},
|
||||||
|
"language": {
|
||||||
|
"type": "keyword",
|
||||||
|
"index": true
|
||||||
|
},
|
||||||
|
"updated_at": {
|
||||||
|
"type": "long",
|
||||||
|
"index": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
)
|
||||||
|
|
||||||
|
func (b *ElasticSearchIndexer) realIndexerName() string {
|
||||||
|
return fmt.Sprintf("%s.v%d", b.indexerAliasName, esRepoIndexerLatestVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init will initialize the indexer
|
||||||
|
func (b *ElasticSearchIndexer) init() (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
var mapping = defaultMapping
|
||||||
|
|
||||||
|
createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !createIndex.Acknowledged {
|
||||||
|
return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check version
|
||||||
|
r, err := b.client.Aliases().Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
realIndexerNames := r.IndicesByAlias(b.indexerAliasName)
|
||||||
|
if len(realIndexerNames) < 1 {
|
||||||
|
res, err := b.client.Alias().
|
||||||
|
Add(b.realIndexerName(), b.indexerAliasName).
|
||||||
|
Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !res.Acknowledged {
|
||||||
|
return false, fmt.Errorf("")
|
||||||
|
}
|
||||||
|
} else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() {
|
||||||
|
log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.",
|
||||||
|
realIndexerNames[0], b.realIndexerName())
|
||||||
|
res, err := b.client.Alias().
|
||||||
|
Remove(realIndexerNames[0], b.indexerAliasName).
|
||||||
|
Add(b.realIndexerName(), b.indexerAliasName).
|
||||||
|
Do(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !res.Acknowledged {
|
||||||
|
return false, fmt.Errorf("")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
|
||||||
|
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
|
||||||
|
RunInDir(repo.RepoPath())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
|
||||||
|
return nil, fmt.Errorf("Misformatted git cat-file output: %v", err)
|
||||||
|
} else if int64(size) > setting.Indexer.MaxIndexerFileSize {
|
||||||
|
return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
|
||||||
|
RunInDirBytes(repo.RepoPath())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if !base.IsTextFile(fileContents) {
|
||||||
|
// FIXME: UTF-16 files will probably fail here
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
id := filenameIndexerID(repo.ID, update.Filename)
|
||||||
|
|
||||||
|
return []elastic.BulkableRequest{
|
||||||
|
elastic.NewBulkIndexRequest().
|
||||||
|
Index(b.indexerAliasName).
|
||||||
|
Id(id).
|
||||||
|
Doc(map[string]interface{}{
|
||||||
|
"repo_id": repo.ID,
|
||||||
|
"content": string(charset.ToUTF8DropErrors(fileContents)),
|
||||||
|
"commit_id": sha,
|
||||||
|
"language": analyze.GetCodeLanguage(update.Filename, fileContents),
|
||||||
|
"updated_at": timeutil.TimeStampNow(),
|
||||||
|
}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repository) elastic.BulkableRequest {
|
||||||
|
id := filenameIndexerID(repo.ID, filename)
|
||||||
|
return elastic.NewBulkDeleteRequest().
|
||||||
|
Index(b.indexerAliasName).
|
||||||
|
Id(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index will save the index data
|
||||||
|
func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
|
||||||
|
reqs := make([]elastic.BulkableRequest, 0)
|
||||||
|
for _, update := range changes.Updates {
|
||||||
|
updateReqs, err := b.addUpdate(sha, update, repo)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(updateReqs) > 0 {
|
||||||
|
reqs = append(reqs, updateReqs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, filename := range changes.RemovedFilenames {
|
||||||
|
reqs = append(reqs, b.addDelete(filename, repo))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reqs) > 0 {
|
||||||
|
_, err := b.client.Bulk().
|
||||||
|
Index(b.indexerAliasName).
|
||||||
|
Add(reqs...).
|
||||||
|
Do(context.Background())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete deletes indexes by ids
|
||||||
|
func (b *ElasticSearchIndexer) Delete(repoID int64) error {
|
||||||
|
_, err := b.client.DeleteByQuery(b.indexerAliasName).
|
||||||
|
Query(elastic.NewTermsQuery("repo_id", repoID)).
|
||||||
|
Do(context.Background())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) {
|
||||||
|
hits := make([]*SearchResult, 0, pageSize)
|
||||||
|
for _, hit := range searchResult.Hits.Hits {
|
||||||
|
// FIXME: There is no way to get the position the keyword on the content currently on the same request.
|
||||||
|
// So we get it from content, this may made the query slower. See
|
||||||
|
// https://discuss.elastic.co/t/fetching-position-of-keyword-in-matched-document/94291
|
||||||
|
var startIndex, endIndex int = -1, -1
|
||||||
|
c, ok := hit.Highlight["content"]
|
||||||
|
if ok && len(c) > 0 {
|
||||||
|
var subStr = make([]rune, 0, len(kw))
|
||||||
|
startIndex = strings.IndexFunc(c[0], func(r rune) bool {
|
||||||
|
if len(subStr) >= len(kw) {
|
||||||
|
subStr = subStr[1:]
|
||||||
|
}
|
||||||
|
subStr = append(subStr, r)
|
||||||
|
return strings.EqualFold(kw, string(subStr))
|
||||||
|
})
|
||||||
|
if startIndex > -1 {
|
||||||
|
endIndex = startIndex + len(kw)
|
||||||
|
} else {
|
||||||
|
panic(fmt.Sprintf("1===%#v", hit.Highlight))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
panic(fmt.Sprintf("2===%#v", hit.Highlight))
|
||||||
|
}
|
||||||
|
|
||||||
|
repoID, fileName := parseIndexerID(hit.Id)
|
||||||
|
var res = make(map[string]interface{})
|
||||||
|
if err := json.Unmarshal(hit.Source, &res); err != nil {
|
||||||
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
language := res["language"].(string)
|
||||||
|
|
||||||
|
hits = append(hits, &SearchResult{
|
||||||
|
RepoID: repoID,
|
||||||
|
Filename: fileName,
|
||||||
|
CommitID: res["commit_id"].(string),
|
||||||
|
Content: res["content"].(string),
|
||||||
|
UpdatedUnix: timeutil.TimeStamp(res["updated_at"].(float64)),
|
||||||
|
Language: language,
|
||||||
|
StartIndex: startIndex,
|
||||||
|
EndIndex: endIndex,
|
||||||
|
Color: enry.GetColor(language),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return searchResult.TotalHits(), hits, extractAggs(searchResult), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
|
||||||
|
var searchResultLanguages []*SearchResultLanguages
|
||||||
|
agg, found := searchResult.Aggregations.Terms("language")
|
||||||
|
if found {
|
||||||
|
searchResultLanguages = make([]*SearchResultLanguages, 0, 10)
|
||||||
|
|
||||||
|
for _, bucket := range agg.Buckets {
|
||||||
|
searchResultLanguages = append(searchResultLanguages, &SearchResultLanguages{
|
||||||
|
Language: bucket.Key.(string),
|
||||||
|
Color: enry.GetColor(bucket.Key.(string)),
|
||||||
|
Count: int(bucket.DocCount),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return searchResultLanguages
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search searches for codes and language stats by given conditions.
|
||||||
|
func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error) {
|
||||||
|
kwQuery := elastic.NewMultiMatchQuery(keyword, "content")
|
||||||
|
query := elastic.NewBoolQuery()
|
||||||
|
query = query.Must(kwQuery)
|
||||||
|
if len(repoIDs) > 0 {
|
||||||
|
var repoStrs = make([]interface{}, 0, len(repoIDs))
|
||||||
|
for _, repoID := range repoIDs {
|
||||||
|
repoStrs = append(repoStrs, repoID)
|
||||||
|
}
|
||||||
|
repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
|
||||||
|
query = query.Must(repoQuery)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
start int
|
||||||
|
kw = "<em>" + keyword + "</em>"
|
||||||
|
aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc()
|
||||||
|
)
|
||||||
|
|
||||||
|
if page > 0 {
|
||||||
|
start = (page - 1) * pageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(language) == 0 {
|
||||||
|
searchResult, err := b.client.Search().
|
||||||
|
Index(b.indexerAliasName).
|
||||||
|
Aggregation("language", aggregation).
|
||||||
|
Query(query).
|
||||||
|
Highlight(elastic.NewHighlight().Field("content")).
|
||||||
|
Sort("repo_id", true).
|
||||||
|
From(start).Size(pageSize).
|
||||||
|
Do(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return convertResult(searchResult, kw, pageSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
langQuery := elastic.NewMatchQuery("language", language)
|
||||||
|
countResult, err := b.client.Search().
|
||||||
|
Index(b.indexerAliasName).
|
||||||
|
Aggregation("language", aggregation).
|
||||||
|
Query(query).
|
||||||
|
Size(0). // We only needs stats information
|
||||||
|
Do(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
query = query.Must(langQuery)
|
||||||
|
searchResult, err := b.client.Search().
|
||||||
|
Index(b.indexerAliasName).
|
||||||
|
Query(query).
|
||||||
|
Highlight(elastic.NewHighlight().Field("content")).
|
||||||
|
Sort("repo_id", true).
|
||||||
|
From(start).Size(pageSize).
|
||||||
|
Do(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
total, hits, _, err := convertResult(searchResult, kw, pageSize)
|
||||||
|
|
||||||
|
return total, hits, extractAggs(countResult), err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements indexer
|
||||||
|
func (b *ElasticSearchIndexer) Close() {}
|
36
modules/indexer/code/elastic_search_test.go
Normal file
36
modules/indexer/code/elastic_search_test.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package code
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestESIndexAndSearch(t *testing.T) {
|
||||||
|
models.PrepareTestEnv(t)
|
||||||
|
|
||||||
|
u := os.Getenv("TEST_INDEXER_CODE_ES_URL")
|
||||||
|
if u == "" {
|
||||||
|
t.SkipNow()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
indexer, _, err := NewElasticSearchIndexer(u, "gitea_codes")
|
||||||
|
if err != nil {
|
||||||
|
assert.Fail(t, "Unable to create ES indexer Error: %v", err)
|
||||||
|
if indexer != nil {
|
||||||
|
indexer.Close()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer indexer.Close()
|
||||||
|
|
||||||
|
testIndexer("elastic_search", t, indexer)
|
||||||
|
}
|
|
@ -7,8 +7,11 @@ package code
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models"
|
||||||
"code.gitea.io/gitea/modules/graceful"
|
"code.gitea.io/gitea/modules/graceful"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
@ -37,12 +40,33 @@ type SearchResultLanguages struct {
|
||||||
|
|
||||||
// Indexer defines an interface to indexer issues contents
|
// Indexer defines an interface to indexer issues contents
|
||||||
type Indexer interface {
|
type Indexer interface {
|
||||||
Index(repoID int64) error
|
Index(repo *models.Repository, sha string, changes *repoChanges) error
|
||||||
Delete(repoID int64) error
|
Delete(repoID int64) error
|
||||||
Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error)
|
Search(repoIDs []int64, language, keyword string, page, pageSize int) (int64, []*SearchResult, []*SearchResultLanguages, error)
|
||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func filenameIndexerID(repoID int64, filename string) string {
|
||||||
|
return indexerID(repoID) + "_" + filename
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseIndexerID(indexerID string) (int64, string) {
|
||||||
|
index := strings.IndexByte(indexerID, '_')
|
||||||
|
if index == -1 {
|
||||||
|
log.Error("Unexpected ID in repo indexer: %s", indexerID)
|
||||||
|
}
|
||||||
|
repoID, _ := strconv.ParseInt(indexerID[:index], 10, 64)
|
||||||
|
return repoID, indexerID[index+1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
func filenameOfIndexerID(indexerID string) string {
|
||||||
|
index := strings.IndexByte(indexerID, '_')
|
||||||
|
if index == -1 {
|
||||||
|
log.Error("Unexpected ID in repo indexer: %s", indexerID)
|
||||||
|
}
|
||||||
|
return indexerID[index+1:]
|
||||||
|
}
|
||||||
|
|
||||||
// Init initialize the repo indexer
|
// Init initialize the repo indexer
|
||||||
func Init() {
|
func Init() {
|
||||||
if !setting.Indexer.RepoIndexerEnabled {
|
if !setting.Indexer.RepoIndexerEnabled {
|
||||||
|
@ -63,33 +87,61 @@ func Init() {
|
||||||
waitChannel := make(chan time.Duration)
|
waitChannel := make(chan time.Duration)
|
||||||
go func() {
|
go func() {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
var (
|
||||||
|
rIndexer Indexer
|
||||||
|
populate bool
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
switch setting.Indexer.RepoType {
|
||||||
|
case "bleve":
|
||||||
log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath)
|
log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
|
log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
|
||||||
log.Error("The indexer files are likely corrupted and may need to be deleted")
|
log.Error("The indexer files are likely corrupted and may need to be deleted")
|
||||||
log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.RepoPath)
|
log.Error("You can completely remove the \"%s\" directory to make Gitea recreate the indexes", setting.Indexer.RepoPath)
|
||||||
cancel()
|
|
||||||
indexer.Close()
|
|
||||||
close(waitChannel)
|
|
||||||
log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
bleveIndexer, created, err := NewBleveIndexer(setting.Indexer.RepoPath)
|
|
||||||
|
rIndexer, populate, err = NewBleveIndexer(setting.Indexer.RepoPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if bleveIndexer != nil {
|
if rIndexer != nil {
|
||||||
bleveIndexer.Close()
|
rIndexer.Close()
|
||||||
}
|
}
|
||||||
cancel()
|
cancel()
|
||||||
indexer.Close()
|
indexer.Close()
|
||||||
close(waitChannel)
|
close(waitChannel)
|
||||||
log.Fatal("PID: %d Unable to initialize the Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
|
log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
|
||||||
}
|
}
|
||||||
indexer.set(bleveIndexer)
|
case "elasticsearch":
|
||||||
|
log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoConnStr)
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err != nil {
|
||||||
|
log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
|
||||||
|
log.Error("The indexer files are likely corrupted and may need to be deleted")
|
||||||
|
log.Error("You can completely remove the \"%s\" index to make Gitea recreate the indexes", setting.Indexer.RepoConnStr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
rIndexer, populate, err = NewElasticSearchIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName)
|
||||||
|
if err != nil {
|
||||||
|
if rIndexer != nil {
|
||||||
|
rIndexer.Close()
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
indexer.Close()
|
||||||
|
close(waitChannel)
|
||||||
|
log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType)
|
||||||
|
}
|
||||||
|
|
||||||
|
indexer.set(rIndexer)
|
||||||
|
|
||||||
go processRepoIndexerOperationQueue(indexer)
|
go processRepoIndexerOperationQueue(indexer)
|
||||||
|
|
||||||
if created {
|
if populate {
|
||||||
go populateRepoIndexer()
|
go populateRepoIndexer()
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
|
|
83
modules/indexer/code/indexer_test.go
Normal file
83
modules/indexer/code/indexer_test.go
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package code
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
models.MainTest(m, filepath.Join("..", "..", ".."))
|
||||||
|
}
|
||||||
|
|
||||||
|
func testIndexer(name string, t *testing.T, indexer Indexer) {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
var repoID int64 = 1
|
||||||
|
err := index(indexer, repoID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
var (
|
||||||
|
keywords = []struct {
|
||||||
|
RepoIDs []int64
|
||||||
|
Keyword string
|
||||||
|
IDs []int64
|
||||||
|
Langs int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
RepoIDs: nil,
|
||||||
|
Keyword: "Description",
|
||||||
|
IDs: []int64{repoID},
|
||||||
|
Langs: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
RepoIDs: []int64{2},
|
||||||
|
Keyword: "Description",
|
||||||
|
IDs: []int64{},
|
||||||
|
Langs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
RepoIDs: nil,
|
||||||
|
Keyword: "repo1",
|
||||||
|
IDs: []int64{repoID},
|
||||||
|
Langs: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
RepoIDs: []int64{2},
|
||||||
|
Keyword: "repo1",
|
||||||
|
IDs: []int64{},
|
||||||
|
Langs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
RepoIDs: nil,
|
||||||
|
Keyword: "non-exist",
|
||||||
|
IDs: []int64{},
|
||||||
|
Langs: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, kw := range keywords {
|
||||||
|
t.Run(kw.Keyword, func(t *testing.T) {
|
||||||
|
total, res, langs, err := indexer.Search(kw.RepoIDs, "", kw.Keyword, 1, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, len(kw.IDs), total)
|
||||||
|
assert.EqualValues(t, kw.Langs, len(langs))
|
||||||
|
|
||||||
|
var ids = make([]int64, 0, len(res))
|
||||||
|
for _, hit := range res {
|
||||||
|
ids = append(ids, hit.RepoID)
|
||||||
|
assert.EqualValues(t, "# repo1\n\nDescription for repo1", hit.Content)
|
||||||
|
}
|
||||||
|
assert.EqualValues(t, kw.IDs, ids)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NoError(t, indexer.Delete(repoID))
|
||||||
|
})
|
||||||
|
}
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"code.gitea.io/gitea/models"
|
"code.gitea.io/gitea/models"
|
||||||
"code.gitea.io/gitea/modules/graceful"
|
"code.gitea.io/gitea/modules/graceful"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type repoIndexerOperation struct {
|
type repoIndexerOperation struct {
|
||||||
|
@ -25,6 +24,30 @@ func initQueue(queueLength int) {
|
||||||
repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
|
repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func index(indexer Indexer, repoID int64) error {
|
||||||
|
repo, err := models.GetRepositoryByID(repoID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sha, err := getDefaultBranchSha(repo)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
changes, err := getRepoChanges(repo, sha)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if changes == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := indexer.Index(repo, sha, changes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
|
||||||
|
}
|
||||||
|
|
||||||
func processRepoIndexerOperationQueue(indexer Indexer) {
|
func processRepoIndexerOperationQueue(indexer Indexer) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -35,7 +58,7 @@ func processRepoIndexerOperationQueue(indexer Indexer) {
|
||||||
log.Error("indexer.Delete: %v", err)
|
log.Error("indexer.Delete: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err = indexer.Index(op.repoID); err != nil {
|
if err = index(indexer, op.repoID); err != nil {
|
||||||
log.Error("indexer.Index: %v", err)
|
log.Error("indexer.Index: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,9 +83,6 @@ func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func addOperationToQueue(op repoIndexerOperation) {
|
func addOperationToQueue(op repoIndexerOperation) {
|
||||||
if !setting.Indexer.RepoIndexerEnabled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
select {
|
||||||
case repoIndexerOperationQueue <- op:
|
case repoIndexerOperationQueue <- op:
|
||||||
break
|
break
|
||||||
|
|
|
@ -7,6 +7,8 @@ package code
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -55,12 +57,12 @@ func (w *wrappedIndexer) get() (Indexer, error) {
|
||||||
return w.internal, nil
|
return w.internal, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *wrappedIndexer) Index(repoID int64) error {
|
func (w *wrappedIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
|
||||||
indexer, err := w.get()
|
indexer, err := w.get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return indexer.Index(repoID)
|
return indexer.Index(repo, sha, changes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *wrappedIndexer) Delete(repoID int64) error {
|
func (w *wrappedIndexer) Delete(repoID int64) error {
|
||||||
|
|
|
@ -36,7 +36,10 @@ var (
|
||||||
StartupTimeout time.Duration
|
StartupTimeout time.Duration
|
||||||
|
|
||||||
RepoIndexerEnabled bool
|
RepoIndexerEnabled bool
|
||||||
|
RepoType string
|
||||||
RepoPath string
|
RepoPath string
|
||||||
|
RepoConnStr string
|
||||||
|
RepoIndexerName string
|
||||||
UpdateQueueLength int
|
UpdateQueueLength int
|
||||||
MaxIndexerFileSize int64
|
MaxIndexerFileSize int64
|
||||||
IncludePatterns []glob.Glob
|
IncludePatterns []glob.Glob
|
||||||
|
@ -52,6 +55,11 @@ var (
|
||||||
IssueQueueConnStr: "",
|
IssueQueueConnStr: "",
|
||||||
IssueQueueBatchNumber: 20,
|
IssueQueueBatchNumber: 20,
|
||||||
|
|
||||||
|
RepoIndexerEnabled: false,
|
||||||
|
RepoType: "bleve",
|
||||||
|
RepoPath: "indexers/repos.bleve",
|
||||||
|
RepoConnStr: "",
|
||||||
|
RepoIndexerName: "gitea_codes",
|
||||||
MaxIndexerFileSize: 1024 * 1024,
|
MaxIndexerFileSize: 1024 * 1024,
|
||||||
ExcludeVendored: true,
|
ExcludeVendored: true,
|
||||||
}
|
}
|
||||||
|
@ -73,10 +81,14 @@ func newIndexerService() {
|
||||||
Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
|
Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
|
||||||
|
|
||||||
Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
|
Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
|
||||||
|
Indexer.RepoType = sec.Key("REPO_INDEXER_TYPE").MustString("bleve")
|
||||||
Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve"))
|
Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve"))
|
||||||
if !filepath.IsAbs(Indexer.RepoPath) {
|
if !filepath.IsAbs(Indexer.RepoPath) {
|
||||||
Indexer.RepoPath = path.Join(AppWorkPath, Indexer.RepoPath)
|
Indexer.RepoPath = path.Join(AppWorkPath, Indexer.RepoPath)
|
||||||
}
|
}
|
||||||
|
Indexer.RepoConnStr = sec.Key("REPO_INDEXER_CONN_STR").MustString("")
|
||||||
|
Indexer.RepoIndexerName = sec.Key("REPO_INDEXER_NAME").MustString("gitea_codes")
|
||||||
|
|
||||||
Indexer.IncludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_INCLUDE").MustString(""))
|
Indexer.IncludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_INCLUDE").MustString(""))
|
||||||
Indexer.ExcludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_EXCLUDE").MustString(""))
|
Indexer.ExcludePatterns = IndexerGlobFromString(sec.Key("REPO_INDEXER_EXCLUDE").MustString(""))
|
||||||
Indexer.ExcludeVendored = sec.Key("REPO_INDEXER_EXCLUDE_VENDORED").MustBool(true)
|
Indexer.ExcludeVendored = sec.Key("REPO_INDEXER_EXCLUDE_VENDORED").MustBool(true)
|
||||||
|
|
Loading…
Reference in a new issue