Skip to content

Relax discovery logic #454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class LoadBalancer extends ConnectionProvider {
constructor(address, routingContext, connectionPool, loadBalancingStrategy, hostNameResolver, driverOnErrorCallback, log) {
super();
this._seedRouter = address;
this._routingTable = new RoutingTable([this._seedRouter]);
this._routingTable = new RoutingTable();
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
Expand Down Expand Up @@ -211,7 +211,10 @@ export class LoadBalancer extends ConnectionProvider {
// try next router
return this._createSessionForRediscovery(currentRouter).then(session => {
if (session) {
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter).catch(error => {
this._log.warn(`unable to fetch routing table because of an error ${error}`);
return null;
});
} else {
// unable to acquire connection and create session towards the current router
// return null to signal that the next router should be tried
Expand Down Expand Up @@ -256,11 +259,8 @@ export class LoadBalancer extends ConnectionProvider {
}

_updateRoutingTable(newRoutingTable) {
const currentRoutingTable = this._routingTable;

// close old connections to servers not present in the new routing table
const staleServers = currentRoutingTable.serversDiff(newRoutingTable);
staleServers.forEach(server => this._connectionPool.purge(server));
this._connectionPool.keepAll(newRoutingTable.allServers());

// make this driver instance aware of the new table
this._routingTable = newRoutingTable;
Expand Down
11 changes: 11 additions & 0 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ class Pool {
Object.keys(this._pools).forEach(key => this._purgeKey(key));
}

/**
* Keep the idle resources for the provided addresses and purge the rest.
*/
keepAll(addresses) {
const keysToKeep = addresses.map(a => a.asKey());
const keysPresent = Object.keys(this._pools);
const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) == -1);

keysToPurge.forEach(key => this._purgeKey(key));
}

/**
* Check if this pool contains resources for the given address.
* @param {ServerAddress} address the address of the server to check.
Expand Down
11 changes: 1 addition & 10 deletions src/v1/internal/routing-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ export default class RoutingTable {
this.writers = removeFromArray(this.writers, address);
}

serversDiff(otherRoutingTable) {
const oldServers = this._allServers();
const newServers = otherRoutingTable._allServers();
const diffTable = {};
oldServers.forEach(oldServer => diffTable[oldServer.asKey()] = oldServer);
newServers.forEach(newServer => delete diffTable[newServer.asKey()]);
return Object.values(diffTable);
}

/**
* Check if this routing table is fresh to perform the required operation.
* @param {string} accessMode the type of operation. Allowed values are {@link READ} and {@link WRITE}.
Expand All @@ -68,7 +59,7 @@ export default class RoutingTable {
accessMode === WRITE && this.writers.length === 0;
}

_allServers() {
allServers() {
return [...this.routers, ...this.readers, ...this.writers];
}

Expand Down
6 changes: 2 additions & 4 deletions test/internal/bolt-stub.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ class StubServer {
function newDriver(url, config = {}) {
// left here for debugging purposes
const logging = {
level: 'debug',
level: (process.env['NEOLOGLEVEL'] || 'error').toLowerCase(),
logger: (level, msg) => console.log(`${level}: ${msg}`)
};
// boltstub currently does not support encryption, create driver with encryption turned off
const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF'}, config);
// use for logging enabled
// const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF', logging}, config);
const newConfig = Object.assign({ encrypted: 'ENCRYPTION_OFF', logging }, config);
return neo4j.driver(url, sharedNeo4j.authToken, newConfig);
}

Expand Down
7 changes: 5 additions & 2 deletions test/internal/connection-providers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ describe('LoadBalancer', () => {
NO_OP_DRIVER_CALLBACK, Logger.noOp());

expectRoutingTable(loadBalancer,
[serverABC],
[],
[],
[]
);
Expand Down Expand Up @@ -1117,6 +1117,9 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved,
loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime);
loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable);
loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved);
if (expirationTime === Integer.ZERO) {
loadBalancer._useSeedRouter = false;
}
return loadBalancer;
}

Expand Down Expand Up @@ -1170,7 +1173,7 @@ class FakeRediscovery {
}

lookupRoutingTableOnRouter(ignored, router) {
return this._routerToRoutingTable[router.asKey()];
return Promise.resolve(this._routerToRoutingTable[router.asKey()]);
}
}

Expand Down
43 changes: 43 additions & 0 deletions test/internal/node/direct.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import neo4j from '../../../src/v1';
import {READ, WRITE} from '../../../src/v1/driver';
import boltStub from '../bolt-stub';
import { SERVICE_UNAVAILABLE } from '../../../src/v1/error';

describe('direct driver with stub server', () => {

Expand Down Expand Up @@ -372,4 +373,46 @@ describe('direct driver with stub server', () => {
}).catch(error => done.fail(error));
});
});

describe('should fail if commit fails due to broken connection', () => {
it('v1', done => {
verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted('v1', done);
});

it('v3', done => {
verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted('v3', done);
});
});

function verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted(version, done) {
if (!boltStub.supported) {
done();
return;
}

const server = boltStub.start(`./test/resources/boltstub/connection_error_on_commit_${version}.script`, 9001);

boltStub.run(() => {
const driver = boltStub.newDriver('bolt://127.0.0.1:9001');
const session = driver.session();

const writeTx = session.beginTransaction();

writeTx.run('CREATE (n {name: \'Bob\'})').then(() =>
writeTx.commit().then(result => fail('expected an error'), (error) => {
expect(error.code).toBe(SERVICE_UNAVAILABLE);
expect(error.message).toContain('Connection was closed by server');
})
).finally(() =>
session.close(() => {
driver.close();

server.exit(code => {
expect(code).toEqual(0);
done();
});
})).catch(error => done.fail(error));
}
);
}
});
67 changes: 55 additions & 12 deletions test/internal/node/routing.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ describe('routing driver with stub server', () => {
// When
const session = driver.session(neo4j.READ);
session.run("MATCH (n) RETURN n.name").catch(err => {
expect(err.code).toEqual(neo4j.error.PROTOCOL_ERROR);
expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE);

session.close();
driver.close();
Expand Down Expand Up @@ -563,7 +563,7 @@ describe('routing driver with stub server', () => {
const session1 = driver.session(neo4j.session.READ);
session1.run("MATCH (n) RETURN n.name").catch(() => {
const session2 = driver.session(neo4j.session.READ);
session2.run("MATCH (n) RETURN n.name").then(() => {
session2.run('MATCH (n) RETURN n.name').then(() => {
driver.close();
seedServer.exit(code1 => {
readServer.exit(code2 => {
Expand Down Expand Up @@ -592,8 +592,8 @@ describe('routing driver with stub server', () => {
const session = driver.session();
session.run("MATCH (n) RETURN n.name").catch(err => {
expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE);
expect(err.message.indexOf('Make sure you are connecting to a causal cluster') > 0).toBeTruthy();
assertHasRouters(driver, ['127.0.0.1:9001']);
expect(err.message).toContain('Could not perform discovery');
assertHasRouters(driver, []);
session.close();
driver.close();
server.exit(code => {
Expand Down Expand Up @@ -960,31 +960,31 @@ describe('routing driver with stub server', () => {
});
});

it('should throw protocol error when no records', done => {
it('should throw error when no records', done => {
testForProtocolError('./test/resources/boltstub/empty_get_servers_response.script', done);
});

it('should throw protocol error when no TTL entry', done => {
it('should throw error when no TTL entry', done => {
testForProtocolError('./test/resources/boltstub/no_ttl_entry_get_servers.script', done);
});

it('should throw protocol error when no servers entry', done => {
it('should throw error when no servers entry', done => {
testForProtocolError('./test/resources/boltstub/no_servers_entry_get_servers.script', done);
});

it('should throw protocol error when multiple records', done => {
it('should throw error when multiple records', done => {
testForProtocolError('./test/resources/boltstub/unparsable_ttl_get_servers.script', done);
});

it('should throw protocol error on unparsable record', done => {
it('should throw error on unparsable record', done => {
testForProtocolError('./test/resources/boltstub/unparsable_servers_get_servers.script', done);
});

it('should throw protocol error when no routers', done => {
it('should throw error when no routers', done => {
testForProtocolError('./test/resources/boltstub/no_routers_get_servers.script', done);
});

it('should throw protocol error when no readers', done => {
it('should throw error when no readers', done => {
testForProtocolError('./test/resources/boltstub/no_readers_get_servers.script', done);
});

Expand Down Expand Up @@ -2125,6 +2125,49 @@ describe('routing driver with stub server', () => {
});
})

it('should revert to initial router if the only known router returns invalid routing table', done => {
if (!boltStub.supported) {
done();
return;
}

// the first seed to get the routing table
// the returned routing table includes a non-reachable read-server and points to only one router
// which will return an invalid routing table
const router1 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_point_to_empty_router_and_exit.script', 9001);
// returns an empty routing table
const router2 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_empty.script', 9004);
// returns a normal routing table
const router3 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_three_servers_and_exit.script', 9003);
// ordinary read server
const reader = boltStub.start('./test/resources/boltstub/read_server_v3_read_tx.script', 9002);

boltStub.run(() => {
const driver = boltStub.newDriver('bolt+routing://my.virtual.host:8080', {
resolver: address => ['127.0.0.1:9001', '127.0.0.1:9003']
});

const session = driver.session(neo4j.session.READ);
session.readTransaction(tx => tx.run('MATCH (n) RETURN n.name')).then(res => {
session.close();
driver.close();
router1.exit(code1 => {
router2.exit(code2 => {
router3.exit(code3 => {
reader.exit(code4 => {
expect(code1).toEqual(0);
expect(code2).toEqual(0);
expect(code3).toEqual(0);
expect(code4).toEqual(0);
done();
});
});
});
});
}).catch(error => done.fail(error));
});
});

function testAddressPurgeOnDatabaseError(query, accessMode, done) {
if (!boltStub.supported) {
done();
Expand Down Expand Up @@ -2254,7 +2297,7 @@ describe('routing driver with stub server', () => {

const session = driver.session();
session.run('MATCH (n) RETURN n.name').catch(error => {
expect(error.code).toEqual(neo4j.error.PROTOCOL_ERROR);
expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE);

session.close();
driver.close();
Expand Down
77 changes: 77 additions & 0 deletions test/internal/pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,83 @@ describe('Pool', () => {
});
});

it('purges keys other than the ones to keep', done => {
let counter = 0;

const address1 = ServerAddress.fromUrl('bolt://localhost:7687');
const address2 = ServerAddress.fromUrl('bolt://localhost:7688');
const address3 = ServerAddress.fromUrl('bolt://localhost:7689');

const pool = new Pool((server, release) => Promise.resolve(new Resource(server, counter++, release)),
res => {
res.destroyed = true;
return true;
}
);

const acquiredResources = [
pool.acquire(address1),
pool.acquire(address2),
pool.acquire(address3),
pool.acquire(address1),
pool.acquire(address2),
pool.acquire(address3)
];

Promise.all(acquiredResources).then(values => {
expect(pool.has(address1)).toBeTruthy();
expect(pool.has(address2)).toBeTruthy();
expect(pool.has(address3)).toBeTruthy();

pool.keepAll([address1, address3]);

expect(pool.has(address1)).toBeTruthy();
expect(pool.has(address3)).toBeTruthy();
expect(pool.has(address2)).toBeFalsy();

done();
});
});

it('purges all keys if addresses to keep is empty', done => {
let counter = 0;

const address1 = ServerAddress.fromUrl('bolt://localhost:7687');
const address2 = ServerAddress.fromUrl('bolt://localhost:7688');
const address3 = ServerAddress.fromUrl('bolt://localhost:7689');

const pool = new Pool((server, release) => Promise.resolve(new Resource(server, counter++, release)),
res => {
res.destroyed = true;
return true;
}
);

const acquiredResources = [
pool.acquire(address1),
pool.acquire(address2),
pool.acquire(address3),
pool.acquire(address1),
pool.acquire(address2),
pool.acquire(address3)
];

Promise.all(acquiredResources).then(values => {
expect(pool.has(address1)).toBeTruthy();
expect(pool.has(address2)).toBeTruthy();
expect(pool.has(address3)).toBeTruthy();

pool.keepAll([]);

expect(pool.has(address1)).toBeFalsy();
expect(pool.has(address3)).toBeFalsy();
expect(pool.has(address2)).toBeFalsy();

done();
});
});


it('skips broken connections during acquire', (done) => {
let validated = false;
let counter = 0;
Expand Down
2 changes: 1 addition & 1 deletion test/internal/rediscovery.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ describe('rediscovery', () => {

expect(routingTable.expirationTime).toEqual(expires);

const allServers = routingTable.serversDiff(new RoutingTable()).sort();
const allServers = routingTable.allServers().sort();
const allExpectedServers = [...routerAddresses, ...readerAddresses, ...writerAddresses].sort();
expect(allServers.map(s => s.asHostPort())).toEqual(allExpectedServers);

Expand Down
Loading