Skip to content

Reduce repo indexer disk usage #3452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions models/issue_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func populateIssueIndexer() error {
return err
}
for _, issue := range issues {
if err := batch.Add(issue.update()); err != nil {
if err := issue.update().AddToFlushingBatch(batch); err != nil {
return err
}
}
Expand All @@ -78,7 +78,7 @@ func processIssueIndexerUpdateQueue() {
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "GetIssueByID: %v", err)
} else if err = batch.Add(issue.update()); err != nil {
} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
}
Expand Down
16 changes: 10 additions & 6 deletions models/repo_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"

"github.com/ethantkoenig/rupture"
)

// RepoIndexerStatus status of a repo's entry in the repo indexer
Expand Down Expand Up @@ -187,7 +189,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) {
return nonGenesisChanges(repo, revision)
}

func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error {
func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error {
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
RunInDir(repo.RepoPath())
if err != nil {
Expand All @@ -206,24 +208,26 @@ func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error
} else if !base.IsTextFile(fileContents) {
return nil
}
return batch.Add(indexer.RepoIndexerUpdate{
indexerUpdate := indexer.RepoIndexerUpdate{
Filepath: update.Filename,
Op: indexer.RepoIndexerOpUpdate,
Data: &indexer.RepoIndexerData{
RepoID: repo.ID,
Content: string(fileContents),
},
})
}
return indexerUpdate.AddToFlushingBatch(batch)
}

func addDelete(filename string, repo *Repository, batch *indexer.Batch) error {
return batch.Add(indexer.RepoIndexerUpdate{
func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error {
indexerUpdate := indexer.RepoIndexerUpdate{
Filepath: filename,
Op: indexer.RepoIndexerOpDelete,
Data: &indexer.RepoIndexerData{
RepoID: repo.ID,
},
})
}
return indexerUpdate.AddToFlushingBatch(batch)
}

// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command
Expand Down
59 changes: 30 additions & 29 deletions modules/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ package indexer

import (
"fmt"
"os"
"strconv"

"code.gitea.io/gitea/modules/setting"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line


"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture"
)

// indexerID a bleve-compatible unique identifier for an integer id
Expand Down Expand Up @@ -53,40 +58,36 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
})
}

// Update represents an update to an indexer
type Update interface {
addToBatch(batch *bleve.Batch) error
}

const maxBatchSize = 16

// Batch batch of indexer updates that automatically flushes once it
// reaches a certain size
type Batch struct {
batch *bleve.Batch
index bleve.Index
}

// Add add update to batch, possibly flushing
func (batch *Batch) Add(update Update) error {
if err := update.addToBatch(batch.batch); err != nil {
return err
// openIndexer open the index at the specified path, checking for metadata
// updates and bleve version updates. If index needs to be created (or
// re-created), returns (nil, nil)
func openIndexer(path string, latestVersion int) (bleve.Index, error) {
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
return batch.flushIfFull()
}

func (batch *Batch) flushIfFull() error {
if batch.batch.Size() >= maxBatchSize {
return batch.Flush()
metadata, err := rupture.ReadIndexMetadata(path)
if err != nil {
return nil, err
}
if metadata.Version < latestVersion {
// the indexer is using a previous version, so we should delete it and
// re-populate
return nil, os.RemoveAll(path)
}
return nil
}

// Flush manually flush the batch, regardless of its size
func (batch *Batch) Flush() error {
if err := batch.index.Batch(batch.batch); err != nil {
return err
index, err := bleve.Open(path)
if err != nil && err == upsidedown.IncompatibleVersion {
// the indexer was built with a previous version of bleve, so we should
// delete it and re-populate
return nil, os.RemoveAll(path)
} else if err != nil {
return nil, err
}
batch.batch.Reset()
return nil
return index, nil
}
59 changes: 32 additions & 27 deletions modules/indexer/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@
package indexer

import (
"os"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"

"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/ethantkoenig/rupture"
)

// issueIndexer (thread-safe) index for searching issues
var issueIndexer bleve.Index

const (
issueIndexerAnalyzer = "issueIndexer"
issueIndexerDocType = "issueIndexerDocType"

issueIndexerLatestVersion = 1
)

// IssueIndexerData data stored in the issue indexer
type IssueIndexerData struct {
RepoID int64
Expand All @@ -28,35 +33,33 @@ type IssueIndexerData struct {
Comments []string
}

// Type returns the document type, for bleve's mapping.Classifier interface.
func (i *IssueIndexerData) Type() string {
return issueIndexerDocType
}

// IssueIndexerUpdate an update to the issue indexer
type IssueIndexerUpdate struct {
IssueID int64
Data *IssueIndexerData
}

func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
return batch.Index(indexerID(update.IssueID), update.Data)
// AddToFlushingBatch adds the update to the given flushing batch.
func (i IssueIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
return batch.Index(indexerID(i.IssueID), i.Data)
}

const issueIndexerAnalyzer = "issueIndexer"

// InitIssueIndexer initialize issue indexer
func InitIssueIndexer(populateIndexer func() error) {
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && !os.IsNotExist(err) {
var err error
issueIndexer, err = openIndexer(setting.Indexer.IssuePath, issueIndexerLatestVersion)
if err != nil {
log.Fatal(4, "InitIssueIndexer: %v", err)
} else if err == nil {
issueIndexer, err = bleve.Open(setting.Indexer.IssuePath)
if err == nil {
return
} else if err != upsidedown.IncompatibleVersion {
log.Fatal(4, "InitIssueIndexer, open index: %v", err)
}
log.Warn("Incompatible bleve version, deleting and recreating issue indexer")
if err = os.RemoveAll(setting.Indexer.IssuePath); err != nil {
log.Fatal(4, "InitIssueIndexer: remove index, %v", err)
}
}
if issueIndexer != nil {
return
}

if err = createIssueIndexer(); err != nil {
log.Fatal(4, "InitIssuesIndexer: create index, %v", err)
}
Expand All @@ -70,9 +73,13 @@ func createIssueIndexer() error {
mapping := bleve.NewIndexMapping()
docMapping := bleve.NewDocumentMapping()

docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping())
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)

textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.Store = false
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Title", textFieldMapping)
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
Expand All @@ -89,19 +96,17 @@ func createIssueIndexer() error {
}

mapping.DefaultAnalyzer = issueIndexerAnalyzer
mapping.AddDocumentMapping("issues", docMapping)
mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())

var err error
issueIndexer, err = bleve.New(setting.Indexer.IssuePath, mapping)
return err
}

// IssueIndexerBatch batch to add updates to
func IssueIndexerBatch() *Batch {
return &Batch{
batch: issueIndexer.NewBatch(),
index: issueIndexer,
}
func IssueIndexerBatch() rupture.FlushingBatch {
return rupture.NewFlushingBatch(issueIndexer, maxBatchSize)
}

// SearchIssuesByKeyword searches for issues by given conditions.
Expand Down
Loading