Skip to content

Commit e66c4e8

Browse files
authored
Merge pull request #454 from ali-ince/1.7-verify-error-on-commit
Relax discovery logic
2 parents 6e48356 + 4e8afd7 commit e66c4e8

18 files changed

+263
-103
lines changed

src/v1/internal/connection-providers.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class LoadBalancer extends ConnectionProvider {
6464
constructor(address, routingContext, connectionPool, loadBalancingStrategy, hostNameResolver, driverOnErrorCallback, log) {
6565
super();
6666
this._seedRouter = address;
67-
this._routingTable = new RoutingTable([this._seedRouter]);
67+
this._routingTable = new RoutingTable();
6868
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
6969
this._connectionPool = connectionPool;
7070
this._driverOnErrorCallback = driverOnErrorCallback;
@@ -211,7 +211,10 @@ export class LoadBalancer extends ConnectionProvider {
211211
// try next router
212212
return this._createSessionForRediscovery(currentRouter).then(session => {
213213
if (session) {
214-
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
214+
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter).catch(error => {
215+
this._log.warn(`unable to fetch routing table because of an error ${error}`);
216+
return null;
217+
});
215218
} else {
216219
// unable to acquire connection and create session towards the current router
217220
// return null to signal that the next router should be tried
@@ -256,11 +259,8 @@ export class LoadBalancer extends ConnectionProvider {
256259
}
257260

258261
_updateRoutingTable(newRoutingTable) {
259-
const currentRoutingTable = this._routingTable;
260-
261262
// close old connections to servers not present in the new routing table
262-
const staleServers = currentRoutingTable.serversDiff(newRoutingTable);
263-
staleServers.forEach(server => this._connectionPool.purge(server));
263+
this._connectionPool.keepAll(newRoutingTable.allServers());
264264

265265
// make this driver instance aware of the new table
266266
this._routingTable = newRoutingTable;

src/v1/internal/pool.js

+11
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ class Pool {
115115
Object.keys(this._pools).forEach(key => this._purgeKey(key));
116116
}
117117

118+
/**
119+
* Keep the idle resources for the provided addresses and purge the rest.
120+
*/
121+
keepAll(addresses) {
122+
const keysToKeep = addresses.map(a => a.asKey());
123+
const keysPresent = Object.keys(this._pools);
124+
const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) == -1);
125+
126+
keysToPurge.forEach(key => this._purgeKey(key));
127+
}
128+
118129
/**
119130
* Check if this pool contains resources for the given address.
120131
* @param {ServerAddress} address the address of the server to check.

src/v1/internal/routing-table.js

+1-10
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,6 @@ export default class RoutingTable {
4747
this.writers = removeFromArray(this.writers, address);
4848
}
4949

50-
serversDiff(otherRoutingTable) {
51-
const oldServers = this._allServers();
52-
const newServers = otherRoutingTable._allServers();
53-
const diffTable = {};
54-
oldServers.forEach(oldServer => diffTable[oldServer.asKey()] = oldServer);
55-
newServers.forEach(newServer => delete diffTable[newServer.asKey()]);
56-
return Object.values(diffTable);
57-
}
58-
5950
/**
6051
* Check if this routing table is fresh to perform the required operation.
6152
* @param {string} accessMode the type of operation. Allowed values are {@link READ} and {@link WRITE}.
@@ -68,7 +59,7 @@ export default class RoutingTable {
6859
accessMode === WRITE && this.writers.length === 0;
6960
}
7061

71-
_allServers() {
62+
allServers() {
7263
return [...this.routers, ...this.readers, ...this.writers];
7364
}
7465

test/internal/bolt-stub.js

+2-4
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,11 @@ class StubServer {
114114
function newDriver(url, config = {}) {
115115
// left here for debugging purposes
116116
const logging = {
117-
level: 'debug',
117+
level: (process.env['NEOLOGLEVEL'] || 'error').toLowerCase(),
118118
logger: (level, msg) => console.log(`${level}: ${msg}`)
119119
};
120120
// boltstub currently does not support encryption, create driver with encryption turned off
121-
const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF'}, config);
122-
// use for logging enabled
123-
// const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF', logging}, config);
121+
const newConfig = Object.assign({ encrypted: 'ENCRYPTION_OFF', logging }, config);
124122
return neo4j.driver(url, sharedNeo4j.authToken, newConfig);
125123
}
126124

test/internal/connection-providers.test.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ describe('LoadBalancer', () => {
174174
NO_OP_DRIVER_CALLBACK, Logger.noOp());
175175

176176
expectRoutingTable(loadBalancer,
177-
[serverABC],
177+
[],
178178
[],
179179
[]
180180
);
@@ -1117,6 +1117,9 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved,
11171117
loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime);
11181118
loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable);
11191119
loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved);
1120+
if (expirationTime === Integer.ZERO) {
1121+
loadBalancer._useSeedRouter = false;
1122+
}
11201123
return loadBalancer;
11211124
}
11221125

@@ -1170,7 +1173,7 @@ class FakeRediscovery {
11701173
}
11711174

11721175
lookupRoutingTableOnRouter(ignored, router) {
1173-
return this._routerToRoutingTable[router.asKey()];
1176+
return Promise.resolve(this._routerToRoutingTable[router.asKey()]);
11741177
}
11751178
}
11761179

test/internal/node/direct.driver.boltkit.test.js

+43
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import neo4j from '../../../src/v1';
2121
import {READ, WRITE} from '../../../src/v1/driver';
2222
import boltStub from '../bolt-stub';
23+
import { SERVICE_UNAVAILABLE } from '../../../src/v1/error';
2324

2425
describe('direct driver with stub server', () => {
2526

@@ -372,4 +373,46 @@ describe('direct driver with stub server', () => {
372373
}).catch(error => done.fail(error));
373374
});
374375
});
376+
377+
describe('should fail if commit fails due to broken connection', () => {
378+
it('v1', done => {
379+
verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted('v1', done);
380+
});
381+
382+
it('v3', done => {
383+
verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted('v3', done);
384+
});
385+
});
386+
387+
function verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted(version, done) {
388+
if (!boltStub.supported) {
389+
done();
390+
return;
391+
}
392+
393+
const server = boltStub.start(`./test/resources/boltstub/connection_error_on_commit_${version}.script`, 9001);
394+
395+
boltStub.run(() => {
396+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001');
397+
const session = driver.session();
398+
399+
const writeTx = session.beginTransaction();
400+
401+
writeTx.run('CREATE (n {name: \'Bob\'})').then(() =>
402+
writeTx.commit().then(result => fail('expected an error'), (error) => {
403+
expect(error.code).toBe(SERVICE_UNAVAILABLE);
404+
expect(error.message).toContain('Connection was closed by server');
405+
})
406+
).finally(() =>
407+
session.close(() => {
408+
driver.close();
409+
410+
server.exit(code => {
411+
expect(code).toEqual(0);
412+
done();
413+
});
414+
})).catch(error => done.fail(error));
415+
}
416+
);
417+
}
375418
});

test/internal/node/routing.driver.boltkit.test.js

+55-12
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ describe('routing driver with stub server', () => {
198198
// When
199199
const session = driver.session(neo4j.READ);
200200
session.run("MATCH (n) RETURN n.name").catch(err => {
201-
expect(err.code).toEqual(neo4j.error.PROTOCOL_ERROR);
201+
expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE);
202202

203203
session.close();
204204
driver.close();
@@ -563,7 +563,7 @@ describe('routing driver with stub server', () => {
563563
const session1 = driver.session(neo4j.session.READ);
564564
session1.run("MATCH (n) RETURN n.name").catch(() => {
565565
const session2 = driver.session(neo4j.session.READ);
566-
session2.run("MATCH (n) RETURN n.name").then(() => {
566+
session2.run('MATCH (n) RETURN n.name').then(() => {
567567
driver.close();
568568
seedServer.exit(code1 => {
569569
readServer.exit(code2 => {
@@ -592,8 +592,8 @@ describe('routing driver with stub server', () => {
592592
const session = driver.session();
593593
session.run("MATCH (n) RETURN n.name").catch(err => {
594594
expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE);
595-
expect(err.message.indexOf('Make sure you are connecting to a causal cluster') > 0).toBeTruthy();
596-
assertHasRouters(driver, ['127.0.0.1:9001']);
595+
expect(err.message).toContain('Could not perform discovery');
596+
assertHasRouters(driver, []);
597597
session.close();
598598
driver.close();
599599
server.exit(code => {
@@ -960,31 +960,31 @@ describe('routing driver with stub server', () => {
960960
});
961961
});
962962

963-
it('should throw protocol error when no records', done => {
963+
it('should throw error when no records', done => {
964964
testForProtocolError('./test/resources/boltstub/empty_get_servers_response.script', done);
965965
});
966966

967-
it('should throw protocol error when no TTL entry', done => {
967+
it('should throw error when no TTL entry', done => {
968968
testForProtocolError('./test/resources/boltstub/no_ttl_entry_get_servers.script', done);
969969
});
970970

971-
it('should throw protocol error when no servers entry', done => {
971+
it('should throw error when no servers entry', done => {
972972
testForProtocolError('./test/resources/boltstub/no_servers_entry_get_servers.script', done);
973973
});
974974

975-
it('should throw protocol error when multiple records', done => {
975+
it('should throw error when multiple records', done => {
976976
testForProtocolError('./test/resources/boltstub/unparsable_ttl_get_servers.script', done);
977977
});
978978

979-
it('should throw protocol error on unparsable record', done => {
979+
it('should throw error on unparsable record', done => {
980980
testForProtocolError('./test/resources/boltstub/unparsable_servers_get_servers.script', done);
981981
});
982982

983-
it('should throw protocol error when no routers', done => {
983+
it('should throw error when no routers', done => {
984984
testForProtocolError('./test/resources/boltstub/no_routers_get_servers.script', done);
985985
});
986986

987-
it('should throw protocol error when no readers', done => {
987+
it('should throw error when no readers', done => {
988988
testForProtocolError('./test/resources/boltstub/no_readers_get_servers.script', done);
989989
});
990990

@@ -2125,6 +2125,49 @@ describe('routing driver with stub server', () => {
21252125
});
21262126
})
21272127

2128+
it('should revert to initial router if the only known router returns invalid routing table', done => {
2129+
if (!boltStub.supported) {
2130+
done();
2131+
return;
2132+
}
2133+
2134+
// the first seed to get the routing table
2135+
// the returned routing table includes a non-reachable read-server and points to only one router
2136+
// which will return an invalid routing table
2137+
const router1 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_point_to_empty_router_and_exit.script', 9001);
2138+
// returns an empty routing table
2139+
const router2 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_empty.script', 9004);
2140+
// returns a normal routing table
2141+
const router3 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_three_servers_and_exit.script', 9003);
2142+
// ordinary read server
2143+
const reader = boltStub.start('./test/resources/boltstub/read_server_v3_read_tx.script', 9002);
2144+
2145+
boltStub.run(() => {
2146+
const driver = boltStub.newDriver('bolt+routing://my.virtual.host:8080', {
2147+
resolver: address => ['127.0.0.1:9001', '127.0.0.1:9003']
2148+
});
2149+
2150+
const session = driver.session(neo4j.session.READ);
2151+
session.readTransaction(tx => tx.run('MATCH (n) RETURN n.name')).then(res => {
2152+
session.close();
2153+
driver.close();
2154+
router1.exit(code1 => {
2155+
router2.exit(code2 => {
2156+
router3.exit(code3 => {
2157+
reader.exit(code4 => {
2158+
expect(code1).toEqual(0);
2159+
expect(code2).toEqual(0);
2160+
expect(code3).toEqual(0);
2161+
expect(code4).toEqual(0);
2162+
done();
2163+
});
2164+
});
2165+
});
2166+
});
2167+
}).catch(error => done.fail(error));
2168+
});
2169+
});
2170+
21282171
function testAddressPurgeOnDatabaseError(query, accessMode, done) {
21292172
if (!boltStub.supported) {
21302173
done();
@@ -2254,7 +2297,7 @@ describe('routing driver with stub server', () => {
22542297

22552298
const session = driver.session();
22562299
session.run('MATCH (n) RETURN n.name').catch(error => {
2257-
expect(error.code).toEqual(neo4j.error.PROTOCOL_ERROR);
2300+
expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE);
22582301

22592302
session.close();
22602303
driver.close();

test/internal/pool.test.js

+77
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,83 @@ describe('Pool', () => {
296296
});
297297
});
298298

299+
it('purges keys other than the ones to keep', done => {
300+
let counter = 0;
301+
302+
const address1 = ServerAddress.fromUrl('bolt://localhost:7687');
303+
const address2 = ServerAddress.fromUrl('bolt://localhost:7688');
304+
const address3 = ServerAddress.fromUrl('bolt://localhost:7689');
305+
306+
const pool = new Pool((server, release) => Promise.resolve(new Resource(server, counter++, release)),
307+
res => {
308+
res.destroyed = true;
309+
return true;
310+
}
311+
);
312+
313+
const acquiredResources = [
314+
pool.acquire(address1),
315+
pool.acquire(address2),
316+
pool.acquire(address3),
317+
pool.acquire(address1),
318+
pool.acquire(address2),
319+
pool.acquire(address3)
320+
];
321+
322+
Promise.all(acquiredResources).then(values => {
323+
expect(pool.has(address1)).toBeTruthy();
324+
expect(pool.has(address2)).toBeTruthy();
325+
expect(pool.has(address3)).toBeTruthy();
326+
327+
pool.keepAll([address1, address3]);
328+
329+
expect(pool.has(address1)).toBeTruthy();
330+
expect(pool.has(address3)).toBeTruthy();
331+
expect(pool.has(address2)).toBeFalsy();
332+
333+
done();
334+
});
335+
});
336+
337+
it('purges all keys if addresses to keep is empty', done => {
338+
let counter = 0;
339+
340+
const address1 = ServerAddress.fromUrl('bolt://localhost:7687');
341+
const address2 = ServerAddress.fromUrl('bolt://localhost:7688');
342+
const address3 = ServerAddress.fromUrl('bolt://localhost:7689');
343+
344+
const pool = new Pool((server, release) => Promise.resolve(new Resource(server, counter++, release)),
345+
res => {
346+
res.destroyed = true;
347+
return true;
348+
}
349+
);
350+
351+
const acquiredResources = [
352+
pool.acquire(address1),
353+
pool.acquire(address2),
354+
pool.acquire(address3),
355+
pool.acquire(address1),
356+
pool.acquire(address2),
357+
pool.acquire(address3)
358+
];
359+
360+
Promise.all(acquiredResources).then(values => {
361+
expect(pool.has(address1)).toBeTruthy();
362+
expect(pool.has(address2)).toBeTruthy();
363+
expect(pool.has(address3)).toBeTruthy();
364+
365+
pool.keepAll([]);
366+
367+
expect(pool.has(address1)).toBeFalsy();
368+
expect(pool.has(address3)).toBeFalsy();
369+
expect(pool.has(address2)).toBeFalsy();
370+
371+
done();
372+
});
373+
});
374+
375+
299376
it('skips broken connections during acquire', (done) => {
300377
let validated = false;
301378
let counter = 0;

test/internal/rediscovery.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ describe('rediscovery', () => {
180180

181181
expect(routingTable.expirationTime).toEqual(expires);
182182

183-
const allServers = routingTable.serversDiff(new RoutingTable()).sort();
183+
const allServers = routingTable.allServers().sort();
184184
const allExpectedServers = [...routerAddresses, ...readerAddresses, ...writerAddresses].sort();
185185
expect(allServers.map(s => s.asHostPort())).toEqual(allExpectedServers);
186186

0 commit comments

Comments
 (0)