Skip to content

Commit 0444177

Browse files
committed
Prevent information loss on host shutdown
Do not pass global CancellationToken to persistence operations on host shutdown. This way it is ensured that persistence operations are not cancelled.
1 parent 7de5032 commit 0444177

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555
finally
5656
{
5757
WorkflowActivity.Enrich(result);
58-
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
58+
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions);
5959
await QueueProvider.QueueWork(itemId, QueueType.Index);
6060
_greylist.Remove($"wf:{itemId}");
6161
}
@@ -68,10 +68,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6868
{
6969
foreach (var sub in result.Subscriptions)
7070
{
71-
await TryProcessSubscription(sub, _persistenceStore, cancellationToken);
71+
await TryProcessSubscription(sub, _persistenceStore);
7272
}
7373

74-
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
74+
await _persistenceStore.PersistErrors(result.Errors);
7575

7676
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
7777
{
@@ -98,24 +98,28 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
9898

9999
}
100100

101-
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
101+
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore)
102102
{
103+
//TODO: move to own class
104+
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
105+
106+
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
103107
if (subscription.EventName != Event.EventTypeActivity)
104108
{
105-
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
109+
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);
106110

107111
foreach (var evt in events)
108112
{
109113
var eventKey = $"evt:{evt}";
110114
bool acquiredLock = false;
111115
try
112116
{
113-
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
117+
acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None);
114118
int attempt = 0;
115119
while (!acquiredLock && attempt < 10)
116120
{
117-
await Task.Delay(Options.IdleTime, cancellationToken);
118-
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
121+
await Task.Delay(Options.IdleTime);
122+
acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None);
119123

120124
attempt++;
121125
}
@@ -127,7 +131,7 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi
127131
else
128132
{
129133
_greylist.Remove(eventKey);
130-
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
134+
await persistenceStore.MarkEventUnprocessed(evt);
131135
await QueueProvider.QueueWork(evt, QueueType.Event);
132136
}
133137
}

0 commit comments

Comments
 (0)