Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Pilosa index driver as library #308

Merged
merged 6 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ before_install:
- docker ps -a

install:
- go get -u github.com/pilosa/go-pilosa
- cd "$GOPATH/src/github.com/pilosa/go-pilosa" && git checkout v0.9.0 && cd "$TRAVIS_BUILD_DIR"
- go get -u github.com/golang/dep/cmd/dep
- touch Gopkg.toml
- dep ensure -v -add "github.com/pilosa/[email protected]" "github.com/pilosa/[email protected]"
- make dependencies

script:
Expand Down
6 changes: 1 addition & 5 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ func (s *SessionManager) NewContext(conn *mysql.Conn) (*sql.Context, DoneFunc, e
sess := s.sessions[conn.ConnectionID]
s.mu.Unlock()
context := sql.NewContext(ctx, sql.WithSession(sess), sql.WithTracer(s.tracer))
id, err := uuid.NewV4()
if err != nil {
cancel()
return nil, nil, err
}
id := uuid.NewV4()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it complaint because the newest version doesn't return an error


s.mu.Lock()
s.sessionContexts[conn.ConnectionID] = append(s.sessionContexts[conn.ConnectionID], id)
Expand Down
39 changes: 13 additions & 26 deletions sql/index/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,10 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"

yaml "gopkg.in/yaml.v2"
)

const (
// ConfigFileName is the name of an index config file.
ConfigFileName = "config.yml"
// ProcessingFileName is the name of the processing index file.
ProcessingFileName = ".processing"
)

// Config represents index configuration
type Config struct {
DB string
Expand Down Expand Up @@ -60,9 +52,8 @@ func WriteConfig(w io.Writer, cfg *Config) error {
return err
}

// WriteConfigFile writes the configuration to dir/config.yml file.
func WriteConfigFile(dir string, cfg *Config) error {
path := filepath.Join(dir, ConfigFileName)
// WriteConfigFile writes the configuration to file.
func WriteConfigFile(path string, cfg *Config) error {
f, err := os.Create(path)
if err != nil {
return err
Expand All @@ -84,9 +75,8 @@ func ReadConfig(r io.Reader) (*Config, error) {
return &cfg, err
}

// ReadConfigFile reads an configuration from dir/config.yml file.
func ReadConfigFile(dir string) (*Config, error) {
path := filepath.Join(dir, ConfigFileName)
// ReadConfigFile reads an configuration from file.
func ReadConfigFile(path string) (*Config, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
Expand All @@ -96,10 +86,9 @@ func ReadConfigFile(dir string) (*Config, error) {
return ReadConfig(f)
}

// CreateProcessingFile creates a file inside the directory saying whether
// the index is being created.
func CreateProcessingFile(dir string) error {
f, err := os.Create(filepath.Join(dir, ProcessingFileName))
// CreateProcessingFile creates a file saying whether the index is being created.
func CreateProcessingFile(path string) error {
f, err := os.Create(path)
if err != nil {
return err
}
Expand All @@ -109,16 +98,14 @@ func CreateProcessingFile(dir string) error {
return nil
}

// RemoveProcessingFile removes the file that says whether the index is still
// being created.
func RemoveProcessingFile(dir string) error {
return os.Remove(filepath.Join(dir, ProcessingFileName))
// RemoveProcessingFile removes the file that says whether the index is still being created.
func RemoveProcessingFile(path string) error {
return os.Remove(path)
}

// ExistsProcessingFile returns whether the processing file exists inside an
// index directory.
func ExistsProcessingFile(dir string) (bool, error) {
_, err := os.Stat(filepath.Join(dir, ProcessingFileName))
// ExistsProcessingFile returns whether the processing file exists.
func ExistsProcessingFile(path string) (bool, error) {
_, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
Expand Down
35 changes: 19 additions & 16 deletions sql/index/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (

func TestConfig(t *testing.T) {
require := require.New(t)
tmpDir, err := ioutil.TempDir("", "index")
require.NoError(err)
defer func() { require.NoError(os.RemoveAll(tmpDir)) }()

driver := "driver"
db, table, id := "db_name", "table_name", "index_id"
path := filepath.Join(os.TempDir(), db, table, id)
err := os.MkdirAll(path, 0750)

dir := filepath.Join(tmpDir, driver)
subdir := filepath.Join(dir, db, table)
err = os.MkdirAll(subdir, 0750)
require.NoError(err)
defer os.RemoveAll(path)
file := filepath.Join(subdir, id+".cfg")

cfg1 := NewConfig(
db,
Expand All @@ -31,36 +35,35 @@ func TestConfig(t *testing.T) {
},
)

err = WriteConfigFile(path, cfg1)
err = WriteConfigFile(file, cfg1)
require.NoError(err)

cfg2, err := ReadConfigFile(path)
cfg2, err := ReadConfigFile(file)
require.NoError(err)
require.Equal(cfg1, cfg2)
}

func TestProcessingFile(t *testing.T) {
require := require.New(t)

dir, err := ioutil.TempDir(os.TempDir(), "processing-file")
tmpDir, err := ioutil.TempDir("", "index")
require.NoError(err)
defer func() {
require.NoError(os.RemoveAll(dir))
}()
defer func() { require.NoError(os.RemoveAll(tmpDir)) }()

file := filepath.Join(tmpDir, ".processing")

ok, err := ExistsProcessingFile(dir)
ok, err := ExistsProcessingFile(file)
require.NoError(err)
require.False(ok)

require.NoError(CreateProcessingFile(dir))
require.NoError(CreateProcessingFile(file))

ok, err = ExistsProcessingFile(dir)
ok, err = ExistsProcessingFile(file)
require.NoError(err)
require.True(ok)

require.NoError(RemoveProcessingFile(dir))
require.NoError(RemoveProcessingFile(file))

ok, err = ExistsProcessingFile(dir)
ok, err = ExistsProcessingFile(file)
require.NoError(err)
require.False(ok)
}
110 changes: 68 additions & 42 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha1"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -24,6 +25,15 @@ const (
IndexNamePrefix = "idx"
// FrameNamePrefix the pilosa's frames prefix
FrameNamePrefix = "frm"

// ConfigFileName is the name of an index config file.
ConfigFileName = "config.yml"

// ProcessingFileName is the name of the lock/processing index file.
ProcessingFileName = ".processing"

// MappingFileName is the name of the mapping file.
MappingFileName = "mapping.db"
)

var (
Expand Down Expand Up @@ -65,7 +75,7 @@ func (*Driver) ID() string {

// Create a new index.
func (d *Driver) Create(db, table, id string, expressions []sql.Expression, config map[string]string) (sql.Index, error) {
path, err := mkdir(d.root, db, table, id)
_, err := mkdir(d.root, db, table, id)
if err != nil {
return nil, err
}
Expand All @@ -76,79 +86,88 @@ func (d *Driver) Create(db, table, id string, expressions []sql.Expression, conf
}

cfg := index.NewConfig(db, table, id, exprs, d.ID(), config)
err = index.WriteConfigFile(path, cfg)
err = index.WriteConfigFile(d.configFilePath(db, table, id), cfg)
if err != nil {
return nil, err
}

return newPilosaIndex(path, d.client, cfg), nil
return newPilosaIndex(d.mappingFilePath(db, table, id), d.client, cfg), nil
}

// LoadAll loads all indexes for given db and table
func (d *Driver) LoadAll(db, table string) ([]sql.Index, error) {
root := filepath.Join(d.root, db, table)

var (
indexes []sql.Index
errors []string
err error
root = filepath.Join(d.root, db, table)
)
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
if path != root || !os.IsNotExist(err) {
errors = append(errors, err.Error())
}
return filepath.SkipDir
}

if info.IsDir() && path != root && info.Name() != "." && info.Name() != ".." {
idx, err := d.loadIndex(path)
dirs, err := ioutil.ReadDir(root)
if err != nil {
if os.IsNotExist(err) {
return indexes, nil
}
return nil, err
}
for _, info := range dirs {
if info.IsDir() && !strings.HasPrefix(info.Name(), ".") {
idx, err := d.loadIndex(db, table, info.Name())
if err != nil {
if !errCorruptedIndex.Is(err) {
errors = append(errors, err.Error())
}

return filepath.SkipDir
continue
}

indexes = append(indexes, idx)
}

return nil
})
}

if len(errors) > 0 {
err = fmt.Errorf(strings.Join(errors, "\n"))
return nil, fmt.Errorf(strings.Join(errors, "\n"))
}
return indexes, err

return indexes, nil
}

func (d *Driver) loadIndex(path string) (sql.Index, error) {
ok, err := index.ExistsProcessingFile(path)
func (d *Driver) loadIndex(db, table, id string) (sql.Index, error) {
dir := filepath.Join(d.root, db, table, id)
config := d.configFilePath(db, table, id)
if _, err := os.Stat(config); err != nil {
return nil, errCorruptedIndex.New(dir)
}

mapping := d.mappingFilePath(db, table, id)
processing := d.processingFilePath(db, table, id)
ok, err := index.ExistsProcessingFile(processing)
if err != nil {
return nil, err
}

if ok {
log := logrus.WithFields(logrus.Fields{
"err": err,
"path": path,
"err": err,
"db": db,
"table": table,
"id": id,
"dir": dir,
})
log.Warn("could not read index file, index is corrupt and will be deleted")

if err := os.RemoveAll(path); err != nil {
log.Warn("unable to remove folder of corrupted index")
if err := os.RemoveAll(dir); err != nil {
log.Warn("unable to remove corrupted index: " + dir)
}

return nil, errCorruptedIndex.New(path)
return nil, errCorruptedIndex.New(dir)
}

cfg, err := index.ReadConfigFile(path)
cfg, err := index.ReadConfigFile(config)
if err != nil {
return nil, err
}
if cfg.Driver(DriverID) == nil {
return nil, errCorruptedIndex.New(dir)
}

idx := newPilosaIndex(path, d.client, cfg)
idx := newPilosaIndex(mapping, d.client, cfg)
return idx, nil
}

Expand All @@ -166,12 +185,8 @@ func (d *Driver) Save(
return errInvalidIndexType.New(i)
}

path, err := mkdir(d.root, i.Database(), i.Table(), i.ID())
if err != nil {
return err
}

if err = index.CreateProcessingFile(path); err != nil {
processingFile := d.processingFilePath(idx.Database(), idx.Table(), idx.ID())
if err = index.CreateProcessingFile(processingFile); err != nil {
return err
}

Expand Down Expand Up @@ -285,13 +300,12 @@ func (d *Driver) Save(
"id": i.ID(),
}).Debugf("finished pilosa indexing")

return index.RemoveProcessingFile(path)
return index.RemoveProcessingFile(processingFile)
}

// Delete the index with the given path.
func (d *Driver) Delete(idx sql.Index) error {
path := filepath.Join(d.root, idx.Database(), idx.Table(), idx.ID())
if err := os.RemoveAll(path); err != nil {
if err := os.RemoveAll(filepath.Join(d.root, idx.Database(), idx.Table(), idx.ID())); err != nil {
return err
}

Expand Down Expand Up @@ -426,3 +440,15 @@ func mkdir(elem ...string) (string, error) {
path := filepath.Join(elem...)
return path, os.MkdirAll(path, 0750)
}

func (d *Driver) configFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, ConfigFileName)
}

func (d *Driver) processingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, ProcessingFileName)
}

func (d *Driver) mappingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, MappingFileName)
}
Loading