28
28
29
29
#include " ../dsql/dsql_proto.h"
30
30
#include " ../dsql/DsqlCursor.h"
31
+ #include " ../dsql/StmtNodes.h"
31
32
32
33
using namespace Firebird ;
33
34
using namespace Jrd ;
@@ -36,10 +37,10 @@ static const char* const SCRATCH = "fb_cursor_";
36
37
static const ULONG PREFETCH_SIZE = 65536 ; // 64 KB
37
38
38
39
DsqlCursor::DsqlCursor (DsqlDmlRequest* req, ULONG flags)
39
- : m_dsqlRequest(req), m_message(req-> getDsqlStatement ()->getReceiveMsg()),
40
- m_resultSet( NULL ), m_flags(flags ),
41
- m_space(req-> getPool (), SCRATCH ),
42
- m_state(BOS), m_eof( false ), m_position( 0 ), m_cachedCount( 0 )
40
+ : m_dsqlRequest(req),
41
+ m_message(req-> getDsqlStatement ()->getReceiveMsg()->msg_number ),
42
+ m_flags(flags ),
43
+ m_space(req-> getPool ( ), SCRATCH )
43
44
{
44
45
TRA_link_cursor (m_dsqlRequest->req_transaction , this );
45
46
}
@@ -48,6 +49,8 @@ DsqlCursor::~DsqlCursor()
48
49
{
49
50
if (m_resultSet)
50
51
m_resultSet->resetHandle ();
52
+
53
+ delete[] m_keyBuffer;
51
54
}
52
55
53
56
jrd_tra* DsqlCursor::getTransaction () const
@@ -66,6 +69,31 @@ void DsqlCursor::setInterfacePtr(JResultSet* interfacePtr) noexcept
66
69
m_resultSet = interfacePtr;
67
70
}
68
71
72
+ bool DsqlCursor::getCurrentRecordKey (USHORT context, RecordKey& key) const
73
+ {
74
+ if (m_keyBuffer == nullptr )
75
+ {
76
+ // A possible situation for a cursor not based on any record source such as
77
+ // a = 1;
78
+ // suspend;
79
+ return false ;
80
+ }
81
+
82
+ if (context * sizeof (RecordKey) >= m_keyBufferLength)
83
+ {
84
+ fb_assert (false );
85
+ return false ;
86
+ }
87
+
88
+ if (m_state != POSITIONED)
89
+ {
90
+ return false ;
91
+ }
92
+
93
+ key = m_keyBuffer[context];
94
+ return key.recordNumber .bid_relation_id != 0 ;
95
+ }
96
+
69
97
void DsqlCursor::close (thread_db* tdbb, DsqlCursor* cursor)
70
98
{
71
99
if (!cursor)
@@ -88,7 +116,7 @@ void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)
88
116
89
117
if (dsqlRequest->req_traced && TraceManager::need_dsql_free (attachment))
90
118
{
91
- TraceSQLStatementImpl stmt (dsqlRequest, NULL );
119
+ TraceSQLStatementImpl stmt (dsqlRequest, nullptr , nullptr );
92
120
TraceManager::event_dsql_free (attachment, &stmt, DSQL_close);
93
121
}
94
122
@@ -115,6 +143,17 @@ int DsqlCursor::fetchNext(thread_db* tdbb, UCHAR* buffer)
115
143
return 1 ;
116
144
}
117
145
146
+ if (m_keyBufferLength == 0 )
147
+ {
148
+ Request* req = m_dsqlRequest->getRequest ();
149
+ m_keyBufferLength = req->req_rpb .getCount () * sizeof (RecordKey);
150
+ if (m_keyBufferLength > 0 )
151
+ m_keyBuffer = FB_NEW_POOL (m_dsqlRequest->getPool ()) RecordKey[req->req_rpb .getCount ()];
152
+ }
153
+
154
+ if (m_keyBufferLength > 0 )
155
+ m_dsqlRequest->gatherRecordKey (m_keyBuffer);
156
+
118
157
m_state = POSITIONED;
119
158
return 0 ;
120
159
}
@@ -163,7 +202,7 @@ int DsqlCursor::fetchAbsolute(thread_db* tdbb, UCHAR* buffer, SLONG position)
163
202
{
164
203
if (!m_eof)
165
204
{
166
- cacheInput (tdbb);
205
+ cacheInput (tdbb, buffer );
167
206
fb_assert (m_eof);
168
207
}
169
208
@@ -248,7 +287,7 @@ void DsqlCursor::getInfo(thread_db* tdbb,
248
287
case IResultSet::INF_RECORD_COUNT:
249
288
if (isScrollable && !m_eof)
250
289
{
251
- cacheInput (tdbb);
290
+ cacheInput (tdbb, nullptr );
252
291
fb_assert (m_eof);
253
292
}
254
293
response.insertInt (tag, isScrollable ? m_cachedCount : -1 );
@@ -291,48 +330,83 @@ int DsqlCursor::fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 positio
291
330
{
292
331
if (position >= m_cachedCount)
293
332
{
294
- if (m_eof || !cacheInput (tdbb, position))
333
+ if (m_eof || !cacheInput (tdbb, buffer, position))
295
334
{
296
335
m_state = EOS;
297
336
return 1 ;
298
337
}
299
338
}
300
339
301
340
fb_assert (position < m_cachedCount);
341
+ fb_assert (m_messageLength > 0 ); // At this point m_messageLength must be set by cacheInput
302
342
303
- UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers [m_message->msg_buffer_number ];
343
+ FB_UINT64 offset = position * (m_messageLength + m_keyBufferLength);
344
+ FB_UINT64 readBytes = m_space.read (offset, buffer, m_messageLength);
304
345
305
- const FB_UINT64 offset = position * m_message->msg_length ;
306
- const FB_UINT64 readBytes = m_space.read (offset, msgBuffer, m_message->msg_length );
307
- fb_assert (readBytes == m_message->msg_length );
346
+ if (m_keyBufferLength > 0 )
347
+ {
348
+ offset += m_messageLength;
349
+ readBytes += m_space.read (offset, m_keyBuffer, m_keyBufferLength);
350
+ }
308
351
309
- m_dsqlRequest-> mapInOut (tdbb, true , m_message, NULL , buffer );
352
+ fb_assert (readBytes == m_messageLength + m_keyBufferLength );
310
353
311
354
m_position = position;
312
355
m_state = POSITIONED;
313
356
return 0 ;
314
357
}
315
358
316
- bool DsqlCursor::cacheInput (thread_db* tdbb, FB_UINT64 position)
359
+ bool DsqlCursor::cacheInput (thread_db* tdbb, UCHAR* buffer, FB_UINT64 position)
317
360
{
318
361
fb_assert (!m_eof);
319
362
320
- const ULONG prefetchCount = MAX (PREFETCH_SIZE / m_message->msg_length , 1 );
321
- const UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers [m_message->msg_buffer_number ];
363
+ // It could not be done before: user buffer length may be unknown until call setDelayedOutputMetadata()
364
+ if (m_messageLength == 0 )
365
+ {
366
+ Request* req = m_dsqlRequest->getRequest ();
367
+ const MessageNode* msg = req->getStatement ()->getMessage (m_message);
368
+ m_messageLength = msg->getFormat (req)->fmt_length ;
369
+ m_keyBufferLength = req->req_rpb .getCount () * sizeof (RecordKey);
370
+ if (m_keyBufferLength > 0 )
371
+ {
372
+ // Save record key unconditionally because setCursorName() can be called after openCursor()
373
+ m_keyBuffer = FB_NEW_POOL (m_dsqlRequest->getPool ()) RecordKey[req->req_rpb .getCount ()];
374
+ }
375
+ }
376
+
377
+ std::unique_ptr<UCHAR[]> ownBuffer;
378
+ if (buffer == nullptr )
379
+ {
380
+ // We are called from getInfo() and there is no user-provided buffer for data.
381
+ // Create a temporary one.
382
+ // This code cannot be moved into getInfo() itself because it is most likely called before fetch()
383
+ // so m_messageLength is still unknown there.
384
+ ownBuffer.reset (buffer = FB_NEW UCHAR[m_messageLength]);
385
+ }
386
+
387
+ const ULONG prefetchCount = MAX (PREFETCH_SIZE / (m_messageLength + m_keyBufferLength), 1 );
322
388
323
389
while (position >= m_cachedCount)
324
390
{
325
391
for (ULONG count = 0 ; count < prefetchCount; count++)
326
392
{
327
- if (!m_dsqlRequest->fetch (tdbb, NULL ))
393
+ if (!m_dsqlRequest->fetch (tdbb, buffer ))
328
394
{
329
395
m_eof = true ;
330
396
break ;
331
397
}
332
398
333
- const FB_UINT64 offset = m_cachedCount * m_message->msg_length ;
334
- const FB_UINT64 writtenBytes = m_space.write (offset, msgBuffer, m_message->msg_length );
335
- fb_assert (writtenBytes == m_message->msg_length );
399
+ FB_UINT64 offset = m_cachedCount * (m_messageLength + m_keyBufferLength);
400
+ FB_UINT64 writtenBytes = m_space.write (offset, buffer, m_messageLength);
401
+
402
+ if (m_keyBufferLength > 0 )
403
+ {
404
+ offset += m_messageLength;
405
+ m_dsqlRequest->gatherRecordKey (m_keyBuffer);
406
+ writtenBytes += m_space.write (offset, m_keyBuffer, m_keyBufferLength);
407
+ }
408
+
409
+ fb_assert (writtenBytes == m_messageLength + m_keyBufferLength);
336
410
m_cachedCount++;
337
411
}
338
412
0 commit comments