Skip to content

kvdb: refactor as preparation for DB migration command in lndinit #5561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,14 @@ func (d *DB) Wipe() error {
// the database are created.
func initChannelDB(db kvdb.Backend) error {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
// Check if DB was marked as inactive with a tomb stone.
if err := EnsureNoTombstone(tx); err != nil {
return err
}

meta := &Meta{}
// Check if DB is already initialized.
err := fetchMeta(meta, tx)
err := FetchMeta(meta, tx)
if err == nil {
return nil
}
Expand Down Expand Up @@ -1417,7 +1422,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
func (d *DB) syncVersions(versions []mandatoryVersion) error {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
if err == ErrMetaNotFound {
meta = &Meta{}
Expand Down Expand Up @@ -1561,6 +1566,12 @@ func (d *DB) ChannelStateDB() *ChannelStateDB {
return d.channelStateDB
}

// LatestDBVersion returns the number of the latest database version currently
// known to the channel DB.
func LatestDBVersion() uint32 {
return getLatestDBVersion(dbVersions)
}

func getLatestDBVersion(versions []mandatoryVersion) uint32 {
return versions[len(versions)-1].number
}
Expand Down
85 changes: 76 additions & 9 deletions channeldb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package channeldb

import (
"bytes"
"errors"
"fmt"

"github.com/lightningnetwork/lnd/kvdb"
Expand All @@ -20,6 +21,15 @@ var (
// dbVersionKey is a boltdb key and it's used for storing/retrieving
// a list of optional migrations that have been applied.
optionalVersionKey = []byte("ovk")

// TombstoneKey is the key under which we add a tag in the source DB
// after we've successfully and completely migrated it to the target/
// destination DB.
TombstoneKey = []byte("data-migration-tombstone")

// ErrMarkerNotPresent is the error that is returned if the queried
// marker is not present in the given database.
ErrMarkerNotPresent = errors.New("marker not present")
)

// Meta structure holds the database meta information.
Expand All @@ -28,13 +38,12 @@ type Meta struct {
DbVersionNumber uint32
}

// FetchMeta fetches the meta data from boltdb and returns filled meta
// structure.
func (d *DB) FetchMeta(tx kvdb.RTx) (*Meta, error) {
// FetchMeta fetches the metadata from boltdb and returns filled meta structure.
func (d *DB) FetchMeta() (*Meta, error) {
var meta *Meta

err := kvdb.View(d, func(tx kvdb.RTx) error {
return fetchMeta(meta, tx)
return FetchMeta(meta, tx)
}, func() {
meta = &Meta{}
})
Expand All @@ -45,10 +54,9 @@ func (d *DB) FetchMeta(tx kvdb.RTx) (*Meta, error) {
return meta, nil
}

// fetchMeta is an internal helper function used in order to allow callers to
// re-use a database transaction. See the publicly exported FetchMeta method
// for more information.
func fetchMeta(meta *Meta, tx kvdb.RTx) error {
// FetchMeta is a helper function used in order to allow callers to re-use a
// database transaction.
func FetchMeta(meta *Meta, tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(metaBucket)
if metaBucket == nil {
return ErrMetaNotFound
Expand Down Expand Up @@ -150,7 +158,7 @@ func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) {
return om, nil
}

// fetchOptionalMeta writes an optional meta to the database.
// putOptionalMeta writes an optional meta to the database.
func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
metaBucket, err := tx.CreateTopLevelBucket(metaBucket)
Expand All @@ -177,3 +185,62 @@ func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return metaBucket.Put(optionalVersionKey, b.Bytes())
}, func() {})
}

// CheckMarkerPresent returns the marker under the requested key or
// ErrMarkerNotFound if either the root bucket or the marker key within that
// bucket does not exist.
func CheckMarkerPresent(tx kvdb.RTx, markerKey []byte) ([]byte, error) {
markerBucket := tx.ReadBucket(markerKey)
if markerBucket == nil {
return nil, ErrMarkerNotPresent
}

val := markerBucket.Get(markerKey)

// If we wrote the marker correctly, we created a bucket _and_ created a
// key with a non-empty value. It doesn't matter to us whether the key
// exists or whether its value is empty, to us, it just means the marker
// isn't there.
if len(val) == 0 {
return nil, ErrMarkerNotPresent
}

return val, nil
}

// EnsureNoTombstone returns an error if there is a tombstone marker in the DB
// of the given transaction.
func EnsureNoTombstone(tx kvdb.RTx) error {
marker, err := CheckMarkerPresent(tx, TombstoneKey)
if err == ErrMarkerNotPresent {
// No marker present, so no tombstone. The DB is still alive.
return nil
}
if err != nil {
return err
}

// There was no error so there is a tombstone marker/tag. We cannot use
// this DB anymore.
return fmt.Errorf("refusing to use db, it was marked with a tombstone "+
"after successful data migration; tombstone reads: %s",
string(marker))
}

// AddMarker adds the marker with the given key into a top level bucket with the
// same name. So the structure will look like:
//
// marker-key (top level bucket)
// |-> marker-key:marker-value (key/value pair)
func AddMarker(tx kvdb.RwTx, markerKey, markerValue []byte) error {
if len(markerValue) == 0 {
return fmt.Errorf("marker value cannot be empty")
}

markerBucket, err := tx.CreateTopLevelBucket(markerKey)
if err != nil {
return err
}

return markerBucket.Put(markerKey, markerValue)
}
125 changes: 119 additions & 6 deletions channeldb/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"testing"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestVersionFetchPut(t *testing.T) {
t.Fatal(err)
}

meta, err := db.FetchMeta(nil)
meta, err := db.FetchMeta()
if err != nil {
t.Fatal(err)
}
Expand All @@ -107,7 +108,7 @@ func TestVersionFetchPut(t *testing.T) {
t.Fatalf("update of meta failed %v", err)
}

meta, err = db.FetchMeta(nil)
meta, err = db.FetchMeta()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestMigrationWithPanic(t *testing.T) {

// Check that version of database and data wasn't changed.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -303,7 +304,7 @@ func TestMigrationWithFatal(t *testing.T) {

// Check that version of database and initial data wasn't changed.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -377,7 +378,7 @@ func TestMigrationWithoutErrors(t *testing.T) {

// Check that version of database and data was properly changed.
afterMigrationFunc := func(d *DB) {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -469,7 +470,7 @@ func TestMigrationDryRun(t *testing.T) {
// Check that version of database version is not modified.
afterMigrationFunc := func(d *DB) {
err := kvdb.View(d, func(tx kvdb.RTx) error {
meta, err := d.FetchMeta(nil)
meta, err := d.FetchMeta()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -574,3 +575,115 @@ func TestApplyOptionalVersions(t *testing.T) {
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 1, migrateCount, "expected no migration")
}

// TestFetchMeta tests that the FetchMeta returns the latest DB version for a
// freshly created DB instance.
func TestFetchMeta(t *testing.T) {
t.Parallel()

db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)

meta := &Meta{}
err = db.View(func(tx walletdb.ReadTx) error {
return FetchMeta(meta, tx)
}, func() {
meta = &Meta{}
})
require.NoError(t, err)

require.Equal(t, LatestDBVersion(), meta.DbVersionNumber)
}

// TestMarkerAndTombstone tests that markers like a tombstone can be added to a
// DB.
func TestMarkerAndTombstone(t *testing.T) {
t.Parallel()

db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)

// Test that a generic marker is not present in a fresh DB.
var marker []byte
err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, []byte("foo"))
return err
}, func() {
marker = nil
})
require.ErrorIs(t, err, ErrMarkerNotPresent)
require.Nil(t, marker)

// Only adding the marker bucket should not be enough to be counted as
// a marker, we explicitly also want the value to be set.
err = db.Update(func(tx walletdb.ReadWriteTx) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

_, err := tx.CreateTopLevelBucket([]byte("foo"))
return err
}, func() {})
require.NoError(t, err)

err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, []byte("foo"))
return err
}, func() {
marker = nil
})
require.ErrorIs(t, err, ErrMarkerNotPresent)
require.Nil(t, marker)

// Test that a tombstone marker is not present in a fresh DB.
err = db.View(EnsureNoTombstone, func() {})
require.NoError(t, err)

// Add a generic marker now and assert that it can be read.
err = db.Update(func(tx walletdb.ReadWriteTx) error {
return AddMarker(tx, []byte("foo"), []byte("bar"))
}, func() {})
require.NoError(t, err)

err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, []byte("foo"))
return err
}, func() {
marker = nil
})
require.NoError(t, err)
require.Equal(t, []byte("bar"), marker)

// A tombstone should still not be present.
err = db.View(EnsureNoTombstone, func() {})
require.NoError(t, err)

// Finally, add a tombstone.
tombstoneText := []byte("RIP test DB")
err = db.Update(func(tx walletdb.ReadWriteTx) error {
return AddMarker(tx, TombstoneKey, tombstoneText)
}, func() {})
require.NoError(t, err)

// We can read it as a normal marker.
err = db.View(func(tx walletdb.ReadTx) error {
var err error
marker, err = CheckMarkerPresent(tx, TombstoneKey)
return err
}, func() {
marker = nil
})
require.NoError(t, err)
require.Equal(t, tombstoneText, marker)

// But also as a tombstone, and now we should get an error that the DB
// cannot be used anymore.
err = db.View(EnsureNoTombstone, func() {})
require.ErrorContains(t, err, string(tombstoneText))

// Now that the DB has a tombstone, we should no longer be able to open
// it once we close it.
_, err = CreateWithBackend(db.Backend)
require.ErrorContains(t, err, string(tombstoneText))
}
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.16.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ crash](https://github.com/lightningnetwork/lnd/pull/7019).
* Updated the github actions to use `make fmt-check` in its [build
process](https://github.com/lightningnetwork/lnd/pull/6853).

* Database related code was refactored to [allow external tools to use it more
easily](https://github.com/lightningnetwork/lnd/pull/5561), in preparation for
adding a data migration functionality to `lndinit`.

# Contributors (Alphabetical Order)

* Carla Kirk-Cohen
Expand Down
9 changes: 8 additions & 1 deletion kvdb/etcd/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func getKeyVal(kv *KV) ([]byte, []byte) {
return getKey(kv.key), val
}

// BucketKey is a helper functon used in tests to create a bucket key from
// BucketKey is a helper function used in tests to create a bucket key from
// passed bucket list.
func BucketKey(buckets ...string) string {
var bucketKey []byte
Expand Down Expand Up @@ -130,3 +130,10 @@ func ValueKey(key string, buckets ...string) string {

return string(makeValueKey(bucket, []byte(key)))
}

// SequenceKey is a helper function used in tests or external tools to create a
// sequence key from the passed bucket list.
func SequenceKey(buckets ...string) string {
id := makeBucketID([]byte(BucketKey(buckets...)))
return string(makeSequenceKey(id[:]))
}
Loading