Skip to content

Commit 1285e4b

Browse files
authored
feat(node): Add kafkajs integration (#13528)
1 parent cef6986 commit 1285e4b

File tree

16 files changed

+196
-56
lines changed

16 files changed

+196
-56
lines changed

dev-packages/node-integration-tests/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
"graphql": "^16.3.0",
4949
"http-terminator": "^3.2.0",
5050
"ioredis": "^5.4.1",
51+
"kafkajs": "2.2.4",
5152
"mongodb": "^3.7.3",
5253
"mongodb-memory-server-global": "^7.6.3",
5354
"mongoose": "^5.13.22",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
services:
2+
db:
3+
image: apache/kafka:latest
4+
restart: always
5+
container_name: integration-tests-kafka
6+
ports:
7+
- '9092:9092'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
const { loggingTransport } = require('@sentry-internal/node-integration-tests');
2+
const Sentry = require('@sentry/node');
3+
4+
Sentry.init({
5+
dsn: 'https://[email protected]/1337',
6+
release: '1.0',
7+
tracesSampleRate: 1.0,
8+
transport: loggingTransport,
9+
});
10+
11+
// Stop the process from exiting before the transaction is sent
12+
setInterval(() => {}, 1000);
13+
14+
const { Kafka } = require('kafkajs');
15+
16+
async function run() {
17+
const kafka = new Kafka({
18+
clientId: 'my-app',
19+
brokers: ['localhost:9092'],
20+
});
21+
22+
const admin = kafka.admin();
23+
await admin.connect();
24+
25+
const producer = kafka.producer();
26+
await producer.connect();
27+
28+
await admin.createTopics({
29+
topics: [{ topic: 'test-topic' }],
30+
});
31+
32+
const consumer = kafka.consumer({
33+
groupId: 'test-group',
34+
});
35+
36+
await consumer.connect();
37+
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
38+
39+
consumer.run({
40+
eachMessage: async ({ message }) => {
41+
// eslint-disable-next-line no-console
42+
console.debug('Received message', message.value.toString());
43+
},
44+
});
45+
46+
// Wait for the consumer to be ready
47+
await new Promise(resolve => setTimeout(resolve, 4000));
48+
49+
await producer.send({
50+
topic: 'test-topic',
51+
messages: [
52+
{
53+
value: 'TEST_MESSAGE',
54+
},
55+
],
56+
});
57+
58+
// Wait for the message to be received
59+
await new Promise(resolve => setTimeout(resolve, 5000));
60+
}
61+
62+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
63+
run();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { cleanupChildProcesses, createRunner } from '../../../utils/runner';
2+
3+
// When running docker compose, we need a larger timeout, as this takes some time...
4+
jest.setTimeout(60_000);
5+
6+
describe('kafkajs', () => {
7+
afterAll(() => {
8+
cleanupChildProcesses();
9+
});
10+
11+
test('traces producers and consumers', done => {
12+
createRunner(__dirname, 'scenario.js')
13+
.withDockerCompose({
14+
workingDirectory: [__dirname],
15+
readyMatches: ['9092'],
16+
})
17+
.expect({
18+
transaction: {
19+
transaction: 'test-topic',
20+
contexts: {
21+
trace: expect.objectContaining({
22+
op: 'message',
23+
status: 'ok',
24+
data: expect.objectContaining({
25+
'messaging.system': 'kafka',
26+
'messaging.destination': 'test-topic',
27+
'otel.kind': 'PRODUCER',
28+
'sentry.op': 'message',
29+
'sentry.origin': 'auto.kafkajs.otel.producer',
30+
}),
31+
}),
32+
},
33+
},
34+
})
35+
.expect({
36+
transaction: {
37+
transaction: 'test-topic',
38+
contexts: {
39+
trace: expect.objectContaining({
40+
op: 'message',
41+
status: 'ok',
42+
data: expect.objectContaining({
43+
'messaging.system': 'kafka',
44+
'messaging.destination': 'test-topic',
45+
'otel.kind': 'CONSUMER',
46+
'sentry.op': 'message',
47+
'sentry.origin': 'auto.kafkajs.otel.consumer',
48+
}),
49+
}),
50+
},
51+
},
52+
})
53+
.start(done);
54+
});
55+
});

packages/astro/src/index.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export {
6565
inboundFiltersIntegration,
6666
initOpenTelemetry,
6767
isInitialized,
68+
kafkaIntegration,
6869
koaIntegration,
6970
lastEventId,
7071
linkedErrorsIntegration,

packages/aws-serverless/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ export {
8989
fsIntegration,
9090
genericPoolIntegration,
9191
graphqlIntegration,
92+
kafkaIntegration,
9293
mongoIntegration,
9394
mongooseIntegration,
9495
mysqlIntegration,

packages/bun/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export {
110110
setupConnectErrorHandler,
111111
genericPoolIntegration,
112112
graphqlIntegration,
113+
kafkaIntegration,
113114
mongoIntegration,
114115
mongooseIntegration,
115116
mysqlIntegration,

packages/google-cloud-serverless/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ export {
8989
fastifyIntegration,
9090
genericPoolIntegration,
9191
graphqlIntegration,
92+
kafkaIntegration,
9293
mongoIntegration,
9394
mongooseIntegration,
9495
mysqlIntegration,

packages/node/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
"@opentelemetry/instrumentation-hapi": "0.41.0",
7979
"@opentelemetry/instrumentation-http": "0.53.0",
8080
"@opentelemetry/instrumentation-ioredis": "0.43.0",
81+
"@opentelemetry/instrumentation-kafkajs": "0.3.0",
8182
"@opentelemetry/instrumentation-koa": "0.43.0",
8283
"@opentelemetry/instrumentation-mongodb": "0.47.0",
8384
"@opentelemetry/instrumentation-mongoose": "0.42.0",

packages/node/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export { anrIntegration } from './integrations/anr';
1414
export { expressIntegration, expressErrorHandler, setupExpressErrorHandler } from './integrations/tracing/express';
1515
export { fastifyIntegration, setupFastifyErrorHandler } from './integrations/tracing/fastify';
1616
export { graphqlIntegration } from './integrations/tracing/graphql';
17+
export { kafkaIntegration } from './integrations/tracing/kafka';
1718
export { mongoIntegration } from './integrations/tracing/mongo';
1819
export { mongooseIntegration } from './integrations/tracing/mongoose';
1920
export { mysqlIntegration } from './integrations/tracing/mysql';

packages/node/src/integrations/tracing/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { fastifyIntegration, instrumentFastify } from './fastify';
77
import { genericPoolIntegration, instrumentGenericPool } from './genericPool';
88
import { graphqlIntegration, instrumentGraphql } from './graphql';
99
import { hapiIntegration, instrumentHapi } from './hapi';
10+
import { instrumentKafka, kafkaIntegration } from './kafka';
1011
import { instrumentKoa, koaIntegration } from './koa';
1112
import { instrumentMongo, mongoIntegration } from './mongo';
1213
import { instrumentMongoose, mongooseIntegration } from './mongoose';
@@ -39,6 +40,7 @@ export function getAutoPerformanceIntegrations(): Integration[] {
3940
koaIntegration(),
4041
connectIntegration(),
4142
genericPoolIntegration(),
43+
kafkaIntegration(),
4244
];
4345
}
4446

@@ -53,6 +55,7 @@ export function getOpenTelemetryInstrumentationToPreload(): (((options?: any) =>
5355
instrumentConnect,
5456
instrumentFastify,
5557
instrumentHapi,
58+
instrumentKafka,
5659
instrumentKoa,
5760
instrumentNest,
5861
instrumentMongo,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';
2+
3+
import { defineIntegration } from '@sentry/core';
4+
import type { IntegrationFn } from '@sentry/types';
5+
import { generateInstrumentOnce } from '../../otel/instrument';
6+
import { addOriginToSpan } from '../../utils/addOriginToSpan';
7+
8+
const INTEGRATION_NAME = 'Kafka';
9+
10+
export const instrumentKafka = generateInstrumentOnce(
11+
INTEGRATION_NAME,
12+
() =>
13+
new KafkaJsInstrumentation({
14+
consumerHook(span) {
15+
addOriginToSpan(span, 'auto.kafkajs.otel.consumer');
16+
},
17+
producerHook(span) {
18+
addOriginToSpan(span, 'auto.kafkajs.otel.producer');
19+
},
20+
}),
21+
);
22+
23+
const _kafkaIntegration = (() => {
24+
return {
25+
name: INTEGRATION_NAME,
26+
setupOnce() {
27+
instrumentKafka();
28+
},
29+
};
30+
}) satisfies IntegrationFn;
31+
32+
/**
33+
* KafkaJs integration
34+
*
35+
* Capture tracing data for KafkaJs.
36+
*/
37+
export const kafkaIntegration = defineIntegration(_kafkaIntegration);

packages/remix/src/index.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ export {
6767
inboundFiltersIntegration,
6868
initOpenTelemetry,
6969
isInitialized,
70+
kafkaIntegration,
7071
koaIntegration,
7172
lastEventId,
7273
linkedErrorsIntegration,

packages/solidstart/src/server/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ export {
5858
inboundFiltersIntegration,
5959
initOpenTelemetry,
6060
isInitialized,
61+
kafkaIntegration,
6162
koaIntegration,
6263
lastEventId,
6364
linkedErrorsIntegration,

packages/sveltekit/src/server/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ export {
6060
inboundFiltersIntegration,
6161
initOpenTelemetry,
6262
isInitialized,
63+
kafkaIntegration,
6364
koaIntegration,
6465
lastEventId,
6566
linkedErrorsIntegration,

0 commit comments

Comments
 (0)