Skip to content

Fix for #8082 by making engine to use user buffers directly #8145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 7, 2025
37 changes: 16 additions & 21 deletions src/dsql/DsqlBatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,6 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat

const auto statement = req->getDsqlStatement();

if (statement->getFlags() & DsqlStatement::FLAG_ORPHAN)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_bad_req_handle));
}

switch (statement->getType())
{
case DsqlStatement::TYPE_INSERT:
Expand All @@ -229,7 +223,7 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat
}

const dsql_msg* message = statement->getSendMsg();
if (! (inMetadata && message && req->parseMetadata(inMetadata, message->msg_parameters)))
if (! (inMetadata && message && message->msg_parameter > 0))
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_batch_param));
Expand Down Expand Up @@ -659,18 +653,23 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
// execute request
m_dsqlRequest->req_transaction = transaction;
Request* req = m_dsqlRequest->getRequest();
DsqlStatement* dStmt = m_dsqlRequest->getDsqlStatement();
fb_assert(req);

// prepare completion interface
AutoPtr<BatchCompletionState, SimpleDispose> completionState
(FB_NEW BatchCompletionState(m_flags & (1 << IBatch::TAG_RECORD_COUNTS), m_detailed));
AutoSetRestore<bool> batchFlag(&req->req_batch_mode, true);
const dsql_msg* message = m_dsqlRequest->getDsqlStatement()->getSendMsg();
const dsql_msg* sendMessage = dStmt->getSendMsg();
// map message to internal engine format
// Do it one time only to avoid parsing its metadata for every message
m_dsqlRequest->metadataToFormat(m_meta, sendMessage);
// Using of positional DML in batch is strange but not forbidden
m_dsqlRequest->mapCursorKey(tdbb);
bool startRequest = true;

bool isExecBlock = m_dsqlRequest->getDsqlStatement()->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const auto receiveMessage = isExecBlock ? m_dsqlRequest->getDsqlStatement()->getReceiveMsg() : nullptr;
auto receiveMsgBuffer = isExecBlock ? m_dsqlRequest->req_msg_buffers[receiveMessage->msg_buffer_number] : nullptr;
bool isExecBlock = dStmt->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const dsql_msg* receiveMessage = isExecBlock ? dStmt->getReceiveMsg() : nullptr;

// process messages
ULONG remains;
Expand Down Expand Up @@ -726,25 +725,18 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
*id = newId;
}

// map message to internal engine format
// pass m_meta one time only to avoid parsing its metadata for every message
m_dsqlRequest->mapInOut(tdbb, false, message, start ? m_meta : nullptr, nullptr, data);
data += m_messageSize;
remains -= m_messageSize;

UCHAR* msgBuffer = m_dsqlRequest->req_msg_buffers[message->msg_buffer_number];
try
{
// runsend data to request and collect stats
ULONG before = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
EXE_send(tdbb, req, message->msg_number, message->msg_length, msgBuffer);
EXE_send(tdbb, req, sendMessage->msg_number, m_messageSize, data);
ULONG after = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
completionState->regUpdate(after - before);

if (isExecBlock)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, receiveMsgBuffer);
if (receiveMessage)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, nullptr); // We don't care about returned record
}
catch (const Exception& ex)
{
Expand All @@ -764,6 +756,9 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)

startRequest = true;
}

data += m_messageSize;
remains -= m_messageSize;
}

UCHAR* alignedData = FB_ALIGN(data, m_alignment);
Expand Down
11 changes: 1 addition & 10 deletions src/dsql/DsqlCompilerScratch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,7 @@ dsql_var* DsqlCompilerScratch::resolveVariable(const MetaName& varName)
// Generate BLR for a return.
void DsqlCompilerScratch::genReturn(bool eosFlag)
{
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION));

if (hasEos && !eosFlag)
appendUChar(blr_begin);
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION | FLAG_EXEC_BLOCK));

appendUChar(blr_send);
appendUChar(1);
Expand Down Expand Up @@ -455,12 +452,6 @@ void DsqlCompilerScratch::genReturn(bool eosFlag)
}

appendUChar(blr_end);

if (hasEos && !eosFlag)
{
appendUChar(blr_stall);
appendUChar(blr_end);
}
}

void DsqlCompilerScratch::genParameters(Array<NestConst<ParameterClause> >& parameters,
Expand Down
9 changes: 7 additions & 2 deletions src/dsql/DsqlCompilerScratch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ typedef Firebird::Pair<
Firebird::NonPooled<NestConst<ValueListNode>, NestConst<ValueListNode>>> ReturningClause;


// DSQL Compiler scratch block - may be discarded after compilation in the future.
// DSQL Compiler scratch block.
// Contains any kind of objects used during DsqlStatement compilation
// Is deleted with its pool as soon as DsqlStatement is fully formed in prepareStatement()
// or with the statement itself (if the statement reqested it returning true from shouldPreserveScratch())
class DsqlCompilerScratch : public BlrDebugWriter
{
public:
Expand All @@ -70,6 +73,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
static const unsigned FLAG_DDL = 0x2000;
static const unsigned FLAG_FETCH = 0x4000;
static const unsigned FLAG_VIEW_WITH_CHECK = 0x8000;
static const unsigned FLAG_EXEC_BLOCK = 0x100000;

static const unsigned MAX_NESTING = 512;

Expand Down Expand Up @@ -105,7 +109,7 @@ class DsqlCompilerScratch : public BlrDebugWriter

protected:
// DsqlCompilerScratch should never be destroyed using delete.
// It dies together with it's pool in release_request().
// It dies together with it's pool.
~DsqlCompilerScratch()
{
}
Expand Down Expand Up @@ -317,6 +321,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
DsqlCompilerScratch* mainScratch = nullptr;
Firebird::NonPooledMap<USHORT, USHORT> outerMessagesMap; // <outer, inner>
Firebird::NonPooledMap<USHORT, USHORT> outerVarsMap; // <outer, inner>
dsql_msg* recordKeyMessage = nullptr; // Side message for positioned DML

private:
Firebird::HalfStaticArray<SelectExprNode*, 4> ctes; // common table expressions
Expand Down
97 changes: 78 additions & 19 deletions src/dsql/DsqlCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "../dsql/dsql_proto.h"
#include "../dsql/DsqlCursor.h"
#include "../dsql/StmtNodes.h"

using namespace Firebird;
using namespace Jrd;
Expand All @@ -36,7 +37,11 @@ static const char* const SCRATCH = "fb_cursor_";
static const ULONG PREFETCH_SIZE = 65536; // 64 KB

DsqlCursor::DsqlCursor(DsqlDmlRequest* req, ULONG flags)
: m_dsqlRequest(req), m_message(req->getDsqlStatement()->getReceiveMsg()),
: m_dsqlRequest(req),
m_keyBuffer(nullptr),
m_keyBufferLength(0),
m_message(req->getDsqlStatement()->getReceiveMsg()->msg_number),
m_messageLength(0),
m_resultSet(NULL), m_flags(flags),
m_space(req->getPool(), SCRATCH),
m_state(BOS), m_eof(false), m_position(0), m_cachedCount(0)
Expand All @@ -48,6 +53,11 @@ DsqlCursor::~DsqlCursor()
{
if (m_resultSet)
m_resultSet->resetHandle();
if (m_keyBuffer)
{
delete[] m_keyBuffer;
m_keyBuffer = nullptr;
}
}

jrd_tra* DsqlCursor::getTransaction() const
Expand All @@ -66,6 +76,23 @@ void DsqlCursor::setInterfacePtr(JResultSet* interfacePtr) noexcept
m_resultSet = interfacePtr;
}

bool DsqlCursor::getCurrentRecordKey(USHORT context, RecordKey& key) const
{
if (m_keyBuffer == nullptr || context * sizeof(RecordKey) >= m_keyBufferLength)
{
fb_assert(false);
return false;
}

if (m_state != POSITIONED)
{
return false;
}

key = m_keyBuffer[context];
return key.recordNumber.bid_relation_id != 0;
}

void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)
{
if (!cursor)
Expand All @@ -88,7 +115,7 @@ void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)

if (dsqlRequest->req_traced && TraceManager::need_dsql_free(attachment))
{
TraceSQLStatementImpl stmt(dsqlRequest, NULL);
TraceSQLStatementImpl stmt(dsqlRequest, nullptr, nullptr);
TraceManager::event_dsql_free(attachment, &stmt, DSQL_close);
}

Expand All @@ -115,6 +142,15 @@ int DsqlCursor::fetchNext(thread_db* tdbb, UCHAR* buffer)
return 1;
}

if (m_keyBufferLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
fb_assert(m_keyBufferLength > 0);
m_keyBuffer = new RecordKey[req->req_rpb.getCount()];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that we copy the whole m_keyBufferLength chunk of bytes to the temp space and back even if they're useless for us (non-relations or no cursor name is assigned).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment below describes it: "setCursorName() can be called after openCursor()" so there is no way to predict if key values are useless or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a pity. Do you know if setCursorName() is really called after openCursor() is practice (e.g. by Delphi applications) or is it just a protection from this theoretically happening? Do you think we could delay the dbkey/recver message setup until setCursorName() is called? At least for unbuffered (unidirectional) fetches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR messages are set up on BLR level and setCursorName() that I see in IBX is been called between isc_dsql_execute2() and isc_dsql_fetch(), i.e. already after BLR->EXE transformation. But in any case API specs describe no limits on setCursorName() usage and the code has no corresponding checks so yes, it theoretically can happen any time and the engine must be ready for that.

I don't see a problem with this buffer: it is small (in most cases 16 bytes), allocated once per cursor and in the temporary space it is piggy-backed to a much bigger record data so I expect no noticeable memory or disk space usage growth.

}

m_dsqlRequest->gatherRecordKey(m_keyBuffer);
m_state = POSITIONED;
return 0;
}
Expand Down Expand Up @@ -163,7 +199,7 @@ int DsqlCursor::fetchAbsolute(thread_db* tdbb, UCHAR* buffer, SLONG position)
{
if (!m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, buffer);
fb_assert(m_eof);
}

Expand Down Expand Up @@ -248,7 +284,7 @@ void DsqlCursor::getInfo(thread_db* tdbb,
case IResultSet::INF_RECORD_COUNT:
if (isScrollable && !m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, nullptr);
fb_assert(m_eof);
}
response.insertInt(tag, isScrollable ? m_cachedCount : -1);
Expand Down Expand Up @@ -291,48 +327,71 @@ int DsqlCursor::fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 positio
{
if (position >= m_cachedCount)
{
if (m_eof || !cacheInput(tdbb, position))
if (m_eof || !cacheInput(tdbb, buffer, position))
{
m_state = EOS;
return 1;
}
}

fb_assert(position < m_cachedCount);
fb_assert(m_messageLength > 0); // At this point m_messageLength must be set by cacheInput

UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];

const FB_UINT64 offset = position * m_message->msg_length;
const FB_UINT64 readBytes = m_space.read(offset, msgBuffer, m_message->msg_length);
fb_assert(readBytes == m_message->msg_length);

m_dsqlRequest->mapInOut(tdbb, true, m_message, NULL, buffer);
FB_UINT64 offset = position * (m_messageLength + m_keyBufferLength);
FB_UINT64 readBytes = m_space.read(offset, buffer, m_messageLength);
offset += m_messageLength;
readBytes += m_space.read(offset, m_keyBuffer, m_keyBufferLength);
fb_assert(readBytes == m_messageLength + m_keyBufferLength);

m_position = position;
m_state = POSITIONED;
return 0;
}

bool DsqlCursor::cacheInput(thread_db* tdbb, FB_UINT64 position)
bool DsqlCursor::cacheInput(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position)
{
fb_assert(!m_eof);

const ULONG prefetchCount = MAX(PREFETCH_SIZE / m_message->msg_length, 1);
const UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];
// It could not be done before: user buffer length may be unknown until call setDelayedOutputMetadata()
if (m_messageLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
const MessageNode* msg = req->getStatement()->getMessage(m_message);
m_messageLength = msg->getFormat(req)->fmt_length;
// Save record key unconditionally because setCursorName() can be called after openCursor()
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
m_keyBuffer = new RecordKey[req->req_rpb.getCount()];
}

std::unique_ptr<UCHAR[]> ownBuffer;
if (buffer == nullptr)
{
// We are called from getInfo() and there is no user-provided buffer for data.
// Create a temporary one.
// This code cannot be moved into getInfo() itself because it is most likely called before fetch()
// so m_messageLength is still unknown there.
ownBuffer.reset(buffer = new UCHAR[m_messageLength]);
}

const ULONG prefetchCount = MAX(PREFETCH_SIZE / (m_messageLength + m_keyBufferLength), 1);

while (position >= m_cachedCount)
{
for (ULONG count = 0; count < prefetchCount; count++)
{
if (!m_dsqlRequest->fetch(tdbb, NULL))
if (!m_dsqlRequest->fetch(tdbb, buffer))
{
m_eof = true;
break;
}

const FB_UINT64 offset = m_cachedCount * m_message->msg_length;
const FB_UINT64 writtenBytes = m_space.write(offset, msgBuffer, m_message->msg_length);
fb_assert(writtenBytes == m_message->msg_length);
m_dsqlRequest->gatherRecordKey(m_keyBuffer);

FB_UINT64 offset = m_cachedCount * (m_messageLength + m_keyBufferLength);
FB_UINT64 writtenBytes = m_space.write(offset, buffer, m_messageLength);
offset += m_messageLength;
writtenBytes += m_space.write(offset, m_keyBuffer, m_keyBufferLength);
fb_assert(writtenBytes == m_messageLength + m_keyBufferLength);
m_cachedCount++;
}

Expand Down
9 changes: 7 additions & 2 deletions src/dsql/DsqlCursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Jrd {

class DsqlDmlRequest;
class JResultSet;
struct RecordKey;

class DsqlCursor
{
Expand All @@ -41,6 +42,7 @@ class DsqlCursor
jrd_tra* getTransaction() const;
Attachment* getAttachment() const;
void setInterfacePtr(JResultSet* interfacePtr) noexcept;
bool getCurrentRecordKey(USHORT context, RecordKey& key) const;

static void close(thread_db* tdbb, DsqlCursor* cursor);

Expand All @@ -67,10 +69,13 @@ class DsqlCursor

private:
int fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position);
bool cacheInput(thread_db* tdbb, FB_UINT64 position = MAX_UINT64);
bool cacheInput(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position = MAX_UINT64);

DsqlDmlRequest* const m_dsqlRequest;
const dsql_msg* const m_message;
RecordKey* m_keyBuffer;
ULONG m_keyBufferLength;
const USHORT m_message;
ULONG m_messageLength;
JResultSet* m_resultSet;
const ULONG m_flags;
TempSpace m_space;
Expand Down
Loading
Loading