Skip to content

Commit f18b0fb

Browse files
authored
Port waitForPendingWrites from android (#2081)
* waitForPendingWrites + allow-unlisten-after-shutdown. * addressing comments * addressing comments #2
1 parent 0eeb71f commit f18b0fb

File tree

11 files changed

+264
-11
lines changed

11 files changed

+264
-11
lines changed

packages/firestore/src/api/database.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,25 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
468468
return this._firestoreClient!.clientShutdown;
469469
}
470470

471+
/**
472+
* Waits until all currently pending writes for the active user have been acknowledged by the
473+
* backend.
474+
*
475+
* The returned Promise resolves immediately if there are no outstanding writes. Otherwise, the
476+
* Promise waits for all previously issued writes (including those written in a previous app
477+
* session), but it does not wait for writes that were added after the method is called. If you
478+
* wish to wait for additional writes, you have to call `waitForPendingWrites()` again.
479+
*
480+
* Any outstanding `waitForPendingWrites()` Promises are rejected during user changes.
481+
*
482+
* @return A Promise which resolves when all currently pending writes have been
483+
* acknowledged by the backend.
484+
*/
485+
_waitForPendingWrites(): Promise<void> {
486+
this.ensureClientConfigured();
487+
return this._firestoreClient!.waitForPendingWrites();
488+
}
489+
471490
ensureClientConfigured(): FirestoreClient {
472491
if (!this._firestoreClient) {
473492
// Kick off starting the client but don't actually wait for it.

packages/firestore/src/core/firestore_client.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,21 @@ export class FirestoreClient {
522522
});
523523
}
524524

525+
/**
526+
* Returns a Promise that resolves when all writes that were pending at the time this
527+
* method was called received server acknowledgement. An acknowledgement can be either acceptance
528+
* or rejection.
529+
*/
530+
waitForPendingWrites(): Promise<void> {
531+
this.verifyNotShutdown();
532+
533+
const deferred = new Deferred<void>();
534+
this.asyncQueue.enqueueAndForget(() => {
535+
return this.syncEngine.registerPendingWritesCallback(deferred);
536+
});
537+
return deferred.promise;
538+
}
539+
525540
listen(
526541
query: Query,
527542
observer: Observer<ViewSnapshot>,

packages/firestore/src/core/sync_engine.ts

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import {
2828
import { MaybeDocument, NoDocument } from '../model/document';
2929
import { DocumentKey } from '../model/document_key';
3030
import { Mutation } from '../model/mutation';
31-
import { MutationBatchResult } from '../model/mutation_batch';
31+
import { MutationBatchResult, BATCHID_UNKNOWN } from '../model/mutation_batch';
3232
import { RemoteEvent, TargetChange } from '../remote/remote_event';
3333
import { RemoteStore } from '../remote/remote_store';
3434
import { RemoteSyncer } from '../remote/remote_syncer';
3535
import { assert, fail } from '../util/assert';
36-
import { FirestoreError } from '../util/error';
36+
import { Code, FirestoreError } from '../util/error';
3737
import * as log from '../util/log';
3838
import { primitiveComparator } from '../util/misc';
3939
import { ObjectMap } from '../util/obj_map';
@@ -160,6 +160,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
160160
private mutationUserCallbacks = {} as {
161161
[uidKey: string]: SortedMap<BatchId, Deferred<void>>;
162162
};
163+
/** Stores user callbacks waiting for all pending writes to be acknowledged. */
164+
private pendingWritesCallbacks = new Map<BatchId, Array<Deferred<void>>>();
163165
private limboTargetIdGenerator = TargetIdGenerator.forSyncEngine();
164166

165167
// The primary state is set to `true` or `false` immediately after Firestore
@@ -435,6 +437,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
435437
(this.isPrimary && source === OnlineStateSource.RemoteStore) ||
436438
(!this.isPrimary && source === OnlineStateSource.SharedClientState)
437439
) {
440+
this.assertSubscribed('applyOnlineStateChange()');
438441
const newViewSnapshots = [] as ViewSnapshot[];
439442
this.queryViewsByQuery.forEach((query, queryView) => {
440443
const viewChange = queryView.view.applyOnlineStateChange(onlineState);
@@ -555,6 +558,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
555558
// before listen events.
556559
this.processUserCallback(batchId, /*error=*/ null);
557560

561+
this.triggerPendingWritesCallbacks(batchId);
562+
558563
try {
559564
const changes = await this.localStore.acknowledgeBatch(
560565
mutationBatchResult
@@ -578,6 +583,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
578583
// listen events.
579584
this.processUserCallback(batchId, error);
580585

586+
this.triggerPendingWritesCallbacks(batchId);
587+
581588
try {
582589
const changes = await this.localStore.rejectBatch(batchId);
583590
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
@@ -587,6 +594,58 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
587594
}
588595
}
589596

597+
/**
598+
* Registers a user callback that resolves when all pending mutations at the moment of calling
599+
* are acknowledged .
600+
*/
601+
async registerPendingWritesCallback(callback: Deferred<void>): Promise<void> {
602+
if (!this.remoteStore.canUseNetwork()) {
603+
log.debug(
604+
LOG_TAG,
605+
'The network is disabled. The task returned by ' +
606+
"'awaitPendingWrites()' will not complete until the network is enabled."
607+
);
608+
}
609+
610+
const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
611+
if (highestBatchId === BATCHID_UNKNOWN) {
612+
// Trigger the callback right away if there is no pending writes at the moment.
613+
callback.resolve();
614+
return;
615+
}
616+
617+
const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || [];
618+
callbacks.push(callback);
619+
this.pendingWritesCallbacks.set(highestBatchId, callbacks);
620+
}
621+
622+
/**
623+
* Triggers the callbacks that are waiting for this batch id to get acknowledged by server,
624+
* if there are any.
625+
*/
626+
private triggerPendingWritesCallbacks(batchId: BatchId): void {
627+
(this.pendingWritesCallbacks.get(batchId) || []).forEach(callback => {
628+
callback.resolve();
629+
});
630+
631+
this.pendingWritesCallbacks.delete(batchId);
632+
}
633+
634+
/** Reject all outstanding callbacks waiting for pending writes to complete. */
635+
private rejectOutstandingPendingWritesCallbacks(errorMessage: string): void {
636+
this.pendingWritesCallbacks.forEach(callbacks => {
637+
callbacks.forEach(callback => {
638+
callback.reject(
639+
new FirestoreError(
640+
Code.CANCELLED, errorMessage
641+
)
642+
);
643+
});
644+
});
645+
646+
this.pendingWritesCallbacks.clear();
647+
}
648+
590649
private addMutationCallback(
591650
batchId: BatchId,
592651
callback: Deferred<void>
@@ -784,6 +843,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
784843
this.currentUser = user;
785844

786845
if (userChanged) {
846+
// Fails tasks waiting for pending writes requested by previous user.
847+
this.rejectOutstandingPendingWritesCallbacks(
848+
"'waitForPendingWrites' promise is rejected due to a user change."
849+
);
850+
787851
const result = await this.localStore.handleUserChange(user);
788852
// TODO(b/114226417): Consider calling this only in the primary tab.
789853
this.sharedClientState.handleUserChange(

packages/firestore/src/local/indexeddb_mutation_queue.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,25 @@ export class IndexedDbMutationQueue implements MutationQueue {
265265
.next(() => foundBatch);
266266
}
267267

268+
getHighestUnacknowledgedBatchId(
269+
transaction: PersistenceTransaction
270+
): PersistencePromise<BatchId> {
271+
const range = IDBKeyRange.upperBound(
272+
[this.userId, Number.POSITIVE_INFINITY]
273+
);
274+
275+
let batchId = BATCHID_UNKNOWN;
276+
return mutationsStore(transaction)
277+
.iterate(
278+
{ index: DbMutationBatch.userMutationsIndex, range, reverse: true },
279+
(key, dbBatch, control) => {
280+
batchId = dbBatch.batchId;
281+
control.done();
282+
}
283+
)
284+
.next(() => batchId);
285+
}
286+
268287
getAllMutationBatches(
269288
transaction: PersistenceTransaction
270289
): PersistencePromise<MutationBatch[]> {

packages/firestore/src/local/local_store.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,20 @@ export class LocalStore {
387387
);
388388
}
389389

390+
/**
391+
* Returns the largest (latest) batch id in mutation queue that is pending server response.
392+
* Returns `BATCHID_UNKNOWN` if the queue is empty.
393+
*/
394+
getHighestUnacknowledgedBatchId(): Promise<BatchId> {
395+
return this.persistence.runTransaction(
396+
'Get highest unacknowledged batch id',
397+
'readonly',
398+
txn => {
399+
return this.mutationQueue.getHighestUnacknowledgedBatchId(txn);
400+
}
401+
);
402+
}
403+
390404
/** Returns the last recorded stream token for the current user. */
391405
getLastStreamToken(): Promise<ProtoByteString> {
392406
return this.persistence.runTransaction(

packages/firestore/src/local/memory_mutation_queue.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { BatchId, ProtoByteString } from '../core/types';
2121
import { DocumentKeySet } from '../model/collections';
2222
import { DocumentKey } from '../model/document_key';
2323
import { Mutation } from '../model/mutation';
24-
import { MutationBatch } from '../model/mutation_batch';
24+
import { MutationBatch, BATCHID_UNKNOWN } from '../model/mutation_batch';
2525
import { emptyByteString } from '../platform/platform';
2626
import { assert } from '../util/assert';
2727
import { primitiveComparator } from '../util/misc';
@@ -177,6 +177,12 @@ export class MemoryMutationQueue implements MutationQueue {
177177
);
178178
}
179179

180+
getHighestUnacknowledgedBatchId(): PersistencePromise<BatchId> {
181+
return PersistencePromise.resolve(
182+
this.mutationQueue.length === 0 ? BATCHID_UNKNOWN : this.nextBatchId - 1
183+
);
184+
}
185+
180186
getAllMutationBatches(
181187
transaction: PersistenceTransaction
182188
): PersistencePromise<MutationBatch[]> {

packages/firestore/src/local/mutation_queue.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ export interface MutationQueue {
103103
batchId: BatchId
104104
): PersistencePromise<MutationBatch | null>;
105105

106+
/**
107+
* Gets the largest (latest) batch id in mutation queue for the current user that is pending
108+
* server response, returns `BATCHID_UNKNOWN` if the queue is empty.
109+
*
110+
* @return the largest batch id in the mutation queue that is not acknowledged.
111+
*/
112+
getHighestUnacknowledgedBatchId(
113+
transaction: PersistenceTransaction
114+
): PersistencePromise<BatchId>;
115+
106116
/** Gets all mutation batches in the mutation queue. */
107117
// TODO(mikelehen): PERF: Current consumer only needs mutated keys; if we can
108118
// provide that cheaply, we should replace this.

packages/firestore/src/remote/remote_store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ export class RemoteStore implements TargetMetadataProvider {
330330
);
331331
}
332332

333-
private canUseNetwork(): boolean {
333+
canUseNetwork(): boolean {
334334
return this.isPrimary && this.networkEnabled;
335335
}
336336

packages/firestore/test/integration/api/database.test.ts

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ import {
3737
withTestDbs,
3838
withTestDoc,
3939
withTestDocAndInitialData,
40-
DEFAULT_SETTINGS
40+
DEFAULT_SETTINGS,
41+
waitForPendingWrites,
42+
withMockCredentialProviderTestDb
4143
} from '../util/helpers';
44+
import { User } from '../../../src/auth/user';
4245

4346
// tslint:disable:no-floating-promises
4447

@@ -743,7 +746,7 @@ apiDescribe('Database', (persistence: boolean) => {
743746
});
744747
});
745748
});
746-
return Promise.all([deferred1.promise, deferred2.promise]).then(() => {});
749+
return Promise.all([deferred1.promise, deferred2.promise]).then(() => { });
747750
});
748751
});
749752

@@ -782,7 +785,7 @@ apiDescribe('Database', (persistence: boolean) => {
782785
it('will reject listens', () => {
783786
const deferred = new Deferred();
784787
queryForRejection.onSnapshot(
785-
() => {},
788+
() => { },
786789
(err: Error) => {
787790
expect(err.name).to.exist;
788791
expect(err.message).to.exist;
@@ -795,12 +798,12 @@ apiDescribe('Database', (persistence: boolean) => {
795798
it('will reject same listens twice in a row', () => {
796799
const deferred = new Deferred();
797800
queryForRejection.onSnapshot(
798-
() => {},
801+
() => { },
799802
(err: Error) => {
800803
expect(err.name).to.exist;
801804
expect(err.message).to.exist;
802805
queryForRejection.onSnapshot(
803-
() => {},
806+
() => { },
804807
(err2: Error) => {
805808
expect(err2.name).to.exist;
806809
expect(err2.message).to.exist;
@@ -1120,4 +1123,48 @@ apiDescribe('Database', (persistence: boolean) => {
11201123
}).to.throw();
11211124
});
11221125
});
1126+
1127+
it('can wait for pending writes', async () => {
1128+
await withTestDoc(persistence, async docRef => {
1129+
const firestore = docRef.firestore;
1130+
// Prevent pending writes receiving acknowledgement.
1131+
await firestore.disableNetwork();
1132+
1133+
const pendingWrites = docRef.set({ foo: 'bar' });
1134+
const awaitPendingWrites = waitForPendingWrites(firestore);
1135+
1136+
// pending writes can receive acknowledgements now.
1137+
await firestore.enableNetwork();
1138+
await pendingWrites;
1139+
await awaitPendingWrites;
1140+
});
1141+
});
1142+
1143+
it('waiting for pending writes should fail when user changes', async () => {
1144+
await withMockCredentialProviderTestDb(persistence, async (db, mockCredentialsProvider) => {
1145+
// Prevent pending writes receiving acknowledgement.
1146+
await db.disableNetwork();
1147+
db.doc('abc/123').set({ foo: 'bar' });
1148+
const awaitPendingWrite = waitForPendingWrites(db);
1149+
1150+
mockCredentialsProvider.triggerUserChange(new User('user_1'));
1151+
1152+
await expect(awaitPendingWrite).to.be.eventually.rejectedWith(
1153+
"'waitForPendingWrites' promise is rejected due to a user change."
1154+
);
1155+
});
1156+
});
1157+
1158+
it('waiting for pending writes resolves immediately when offline and no pending writes',
1159+
async () => {
1160+
await withTestDoc(persistence, async docRef => {
1161+
const firestore = docRef.firestore;
1162+
// Prevent pending writes receiving acknowledgement.
1163+
await firestore.disableNetwork();
1164+
1165+
// `awaitsPendingWrites` is created when there is no pending writes, it will resolve
1166+
// immediately even if we are offline.
1167+
await waitForPendingWrites(firestore);
1168+
});
1169+
});
11231170
});

0 commit comments

Comments
 (0)