Skip to content

Commit cd295b6

Browse files
authored
GODRIVER-3035 Publish TopologyDescriptionChangedEvent on topology close. (#2002)
1 parent 0df1712 commit cd295b6

File tree

4 files changed

+107
-7
lines changed

4 files changed

+107
-7
lines changed

internal/integration/unified/client_entity.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type clientEntity struct {
5454
serverHeartbeatStartedEvent []*event.ServerHeartbeatStartedEvent
5555
serverHeartbeatSucceeded []*event.ServerHeartbeatSucceededEvent
5656
topologyDescriptionChanged []*event.TopologyDescriptionChangedEvent
57+
topologyOpening []*event.TopologyOpeningEvent
58+
topologyClosed []*event.TopologyClosedEvent
5759
ignoredCommands map[string]struct{}
5860
observeSensitiveCommands *bool
5961
numConnsCheckedOut int32
@@ -161,6 +163,8 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
161163
ServerHeartbeatStarted: entity.processServerHeartbeatStartedEvent,
162164
ServerHeartbeatSucceeded: entity.processServerHeartbeatSucceededEvent,
163165
TopologyDescriptionChanged: entity.processTopologyDescriptionChangedEvent,
166+
TopologyOpening: entity.processTopologyOpeningEvent,
167+
TopologyClosed: entity.processTopologyClosedEvent,
164168
}
165169

166170
clientOpts.SetMonitor(commandMonitor).SetPoolMonitor(poolMonitor).SetServerMonitor(serverMonitor)
@@ -575,6 +579,36 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
575579
c.addEventsCount(topologyDescriptionChangedEvent)
576580
}
577581

582+
func (c *clientEntity) processTopologyOpeningEvent(evt *event.TopologyOpeningEvent) {
583+
c.eventProcessMu.Lock()
584+
defer c.eventProcessMu.Unlock()
585+
586+
if !c.getRecordEvents() {
587+
return
588+
}
589+
590+
if _, ok := c.observedEvents[topologyOpeningEvent]; ok {
591+
c.topologyOpening = append(c.topologyOpening, evt)
592+
}
593+
594+
c.addEventsCount(topologyOpeningEvent)
595+
}
596+
597+
func (c *clientEntity) processTopologyClosedEvent(evt *event.TopologyClosedEvent) {
598+
c.eventProcessMu.Lock()
599+
defer c.eventProcessMu.Unlock()
600+
601+
if !c.getRecordEvents() {
602+
return
603+
}
604+
605+
if _, ok := c.observedEvents[topologyClosedEvent]; ok {
606+
c.topologyClosed = append(c.topologyClosed, evt)
607+
}
608+
609+
c.addEventsCount(topologyClosedEvent)
610+
}
611+
578612
func (c *clientEntity) setRecordEvents(record bool) {
579613
c.recordEvents.Store(record)
580614
}

internal/integration/unified/event.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const (
3535
serverHeartbeatStartedEvent monitoringEventType = "ServerHeartbeatStartedEvent"
3636
serverHeartbeatSucceededEvent monitoringEventType = "ServerHeartbeatSucceededEvent"
3737
topologyDescriptionChangedEvent monitoringEventType = "TopologyDescriptionChangedEvent"
38+
topologyOpeningEvent monitoringEventType = "TopologyOpeningEvent"
39+
topologyClosedEvent monitoringEventType = "TopologyClosedEvent"
3840
)
3941

4042
func monitoringEventTypeFromString(eventStr string) (monitoringEventType, bool) {
@@ -77,6 +79,10 @@ func monitoringEventTypeFromString(eventStr string) (monitoringEventType, bool)
7779
return serverHeartbeatSucceededEvent, true
7880
case "topologydescriptionchangedevent":
7981
return topologyDescriptionChangedEvent, true
82+
case "topologyopeningevent":
83+
return topologyOpeningEvent, true
84+
case "topologyclosedevent":
85+
return topologyClosedEvent, true
8086
default:
8187
return "", false
8288
}

internal/integration/unified/event_verification.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,16 @@ type sdamEvent struct {
8787
Awaited *bool `bson:"awaited"`
8888
} `bson:"serverHeartbeatFailedEvent"`
8989

90-
TopologyDescriptionChangedEvent *struct{} `bson:"topologyDescriptionChangedEvent"`
90+
TopologyDescriptionChangedEvent *struct {
91+
PreviousDescription *struct {
92+
Type *string `bson:"type"`
93+
} `bson:"previousDescription"`
94+
NewDescription *struct {
95+
Type *string `bson:"type"`
96+
} `bson:"newDescription"`
97+
} `bson:"topologyDescriptionChangedEvent"`
98+
TopologyOpeningEvent *struct{} `bson:"topologyOpeningEvent"`
99+
TopologyClosedEvent *struct{} `bson:"topologyClosedEvent"`
91100
}
92101

93102
type expectedEvents struct {
@@ -468,7 +477,7 @@ func getNextServerHeartbeatSucceededEvent(
468477
return nil, nil, errors.New("no heartbeat succeeded event published")
469478
}
470479

471-
return events[0], events[:1], nil
480+
return events[0], events[1:], nil
472481
}
473482

474483
func getNextServerHeartbeatFailedEvent(
@@ -478,7 +487,7 @@ func getNextServerHeartbeatFailedEvent(
478487
return nil, nil, errors.New("no heartbeat failed event published")
479488
}
480489

481-
return events[0], events[:1], nil
490+
return events[0], events[1:], nil
482491
}
483492

484493
func getNextTopologyDescriptionChangedEvent(
@@ -488,7 +497,27 @@ func getNextTopologyDescriptionChangedEvent(
488497
return nil, nil, errors.New("no topology description changed event published")
489498
}
490499

491-
return events[0], events[:1], nil
500+
return events[0], events[1:], nil
501+
}
502+
503+
func getNextTopologyOpeningEvent(
504+
events []*event.TopologyOpeningEvent,
505+
) (*event.TopologyOpeningEvent, []*event.TopologyOpeningEvent, error) {
506+
if len(events) == 0 {
507+
return nil, nil, errors.New("no topology opening event published")
508+
}
509+
510+
return events[0], events[1:], nil
511+
}
512+
513+
func getNextTopologyClosedEvent(
514+
events []*event.TopologyClosedEvent,
515+
) (*event.TopologyClosedEvent, []*event.TopologyClosedEvent, error) {
516+
if len(events) == 0 {
517+
return nil, nil, errors.New("no topology closed event published")
518+
}
519+
520+
return events[0], events[1:], nil
492521
}
493522

494523
func verifySDAMEvents(client *clientEntity, expectedEvents *expectedEvents) error {
@@ -498,9 +527,21 @@ func verifySDAMEvents(client *clientEntity, expectedEvents *expectedEvents) erro
498527
succeeded = client.serverHeartbeatSucceeded
499528
failed = client.serverHeartbeatFailedEvent
500529
tchanged = client.topologyDescriptionChanged
530+
topening = client.topologyOpening
531+
tclosed = client.topologyClosed
501532
)
502533

503-
vol := func() int { return len(changed) + len(started) + len(succeeded) + len(failed) + len(tchanged) }
534+
vol := func() int {
535+
var count int
536+
count += len(changed)
537+
count += len(started)
538+
count += len(succeeded)
539+
count += len(failed)
540+
count += len(tchanged)
541+
count += len(topening)
542+
count += len(tclosed)
543+
return count
544+
}
504545

505546
if len(expectedEvents.SDAMEvents) == 0 && vol() != 0 {
506547
return fmt.Errorf("expected no sdam events to be sent but got %s", stringifyEventsForClient(client))
@@ -569,7 +610,23 @@ func verifySDAMEvents(client *clientEntity, expectedEvents *expectedEvents) erro
569610
return newEventVerificationError(idx, client, "want awaited %v, got %v", *want, got.Awaited)
570611
}
571612
case evt.TopologyDescriptionChangedEvent != nil:
572-
if _, tchanged, err = getNextTopologyDescriptionChangedEvent(tchanged); err != nil {
613+
var got *event.TopologyDescriptionChangedEvent
614+
if got, tchanged, err = getNextTopologyDescriptionChangedEvent(tchanged); err != nil {
615+
return newEventVerificationError(idx, client, "failed to get next description changed event: %v", err.Error())
616+
}
617+
618+
if want := evt.TopologyDescriptionChangedEvent.PreviousDescription; want != nil && want.Type != nil && *want.Type != got.PreviousDescription.Kind {
619+
return newEventVerificationError(idx, client, "want previous description %v, got %v", *want.Type, got.PreviousDescription.Kind)
620+
}
621+
if want := evt.TopologyDescriptionChangedEvent.NewDescription; want != nil && want.Type != nil && *want.Type != got.NewDescription.Kind {
622+
return newEventVerificationError(idx, client, "want new description %v, got %v", *want.Type, got.NewDescription.Kind)
623+
}
624+
case evt.TopologyOpeningEvent != nil:
625+
if _, topening, err = getNextTopologyOpeningEvent(topening); err != nil {
626+
return newEventVerificationError(idx, client, "failed to get next description changed event: %v", err.Error())
627+
}
628+
case evt.TopologyClosedEvent != nil:
629+
if _, tclosed, err = getNextTopologyClosedEvent(tclosed); err != nil {
573630
return newEventVerificationError(idx, client, "failed to get next description changed event: %v", err.Error())
574631
}
575632
}

x/mongo/driver/topology/topology.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,10 @@ func (t *Topology) Disconnect(ctx context.Context) error {
405405
t.pollingwg.Wait()
406406
}
407407

408-
t.desc.Store(description.Topology{})
408+
oldDesc := t.fsm.Topology
409+
t.fsm = newFSM()
410+
t.desc.Store(t.fsm.Topology)
411+
t.publishTopologyDescriptionChangedEvent(oldDesc, t.fsm.Topology)
409412

410413
atomic.StoreInt64(&t.state, topologyDisconnected)
411414
t.publishTopologyClosedEvent()

0 commit comments

Comments
 (0)