Skip to content

Fix for #8082 by making engine to use user buffers directly #8145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 7, 2025
37 changes: 16 additions & 21 deletions src/dsql/DsqlBatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,6 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat

const auto statement = req->getDsqlStatement();

if (statement->getFlags() & DsqlStatement::FLAG_ORPHAN)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_bad_req_handle));
}

switch (statement->getType())
{
case DsqlStatement::TYPE_INSERT:
Expand All @@ -229,7 +223,7 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat
}

const dsql_msg* message = statement->getSendMsg();
if (! (inMetadata && message && req->parseMetadata(inMetadata, message->msg_parameters)))
if (! (inMetadata && message && message->msg_parameter > 0))
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_batch_param));
Expand Down Expand Up @@ -659,18 +653,23 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
// execute request
m_dsqlRequest->req_transaction = transaction;
Request* req = m_dsqlRequest->getRequest();
DsqlStatement* dStmt = m_dsqlRequest->getDsqlStatement();
fb_assert(req);

// prepare completion interface
AutoPtr<BatchCompletionState, SimpleDispose> completionState
(FB_NEW BatchCompletionState(m_flags & (1 << IBatch::TAG_RECORD_COUNTS), m_detailed));
AutoSetRestore<bool> batchFlag(&req->req_batch_mode, true);
const dsql_msg* message = m_dsqlRequest->getDsqlStatement()->getSendMsg();
const dsql_msg* sendMessage = dStmt->getSendMsg();
// map message to internal engine format
// Do it one time only to avoid parsing its metadata for every message
m_dsqlRequest->metadataToFormat(m_meta, sendMessage);
// Using of positional DML in batch is strange but not forbidden
m_dsqlRequest->mapCursorKey(tdbb);
bool startRequest = true;

bool isExecBlock = m_dsqlRequest->getDsqlStatement()->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const auto receiveMessage = isExecBlock ? m_dsqlRequest->getDsqlStatement()->getReceiveMsg() : nullptr;
auto receiveMsgBuffer = isExecBlock ? m_dsqlRequest->req_msg_buffers[receiveMessage->msg_buffer_number] : nullptr;
bool isExecBlock = dStmt->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const dsql_msg* receiveMessage = isExecBlock ? dStmt->getReceiveMsg() : nullptr;

// process messages
ULONG remains;
Expand Down Expand Up @@ -726,25 +725,18 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
*id = newId;
}

// map message to internal engine format
// pass m_meta one time only to avoid parsing its metadata for every message
m_dsqlRequest->mapInOut(tdbb, false, message, start ? m_meta : nullptr, nullptr, data);
data += m_messageSize;
remains -= m_messageSize;

UCHAR* msgBuffer = m_dsqlRequest->req_msg_buffers[message->msg_buffer_number];
try
{
// runsend data to request and collect stats
ULONG before = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
EXE_send(tdbb, req, message->msg_number, message->msg_length, msgBuffer);
EXE_send(tdbb, req, sendMessage->msg_number, m_messageSize, data);
ULONG after = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
completionState->regUpdate(after - before);

if (isExecBlock)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, receiveMsgBuffer);
if (receiveMessage)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, nullptr); // We don't care about returned record
}
catch (const Exception& ex)
{
Expand All @@ -764,6 +756,9 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)

startRequest = true;
}

data += m_messageSize;
remains -= m_messageSize;
}

UCHAR* alignedData = FB_ALIGN(data, m_alignment);
Expand Down
11 changes: 1 addition & 10 deletions src/dsql/DsqlCompilerScratch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,7 @@ dsql_var* DsqlCompilerScratch::resolveVariable(const MetaName& varName)
// Generate BLR for a return.
void DsqlCompilerScratch::genReturn(bool eosFlag)
{
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION));

if (hasEos && !eosFlag)
appendUChar(blr_begin);
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION | FLAG_EXEC_BLOCK));

appendUChar(blr_send);
appendUChar(1);
Expand Down Expand Up @@ -455,12 +452,6 @@ void DsqlCompilerScratch::genReturn(bool eosFlag)
}

appendUChar(blr_end);

if (hasEos && !eosFlag)
{
appendUChar(blr_stall);
appendUChar(blr_end);
}
}

void DsqlCompilerScratch::genParameters(Array<NestConst<ParameterClause> >& parameters,
Expand Down
9 changes: 7 additions & 2 deletions src/dsql/DsqlCompilerScratch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ typedef Firebird::Pair<
Firebird::NonPooled<NestConst<ValueListNode>, NestConst<ValueListNode>>> ReturningClause;


// DSQL Compiler scratch block - may be discarded after compilation in the future.
// DSQL Compiler scratch block.
// Contains any kind of objects used during DsqlStatement compilation
// Is deleted with its pool as soon as DsqlStatement is fully formed in prepareStatement()
// or with the statement itself (if the statement reqested it returning true from shouldPreserveScratch())
class DsqlCompilerScratch : public BlrDebugWriter
{
public:
Expand All @@ -70,6 +73,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
static const unsigned FLAG_DDL = 0x2000;
static const unsigned FLAG_FETCH = 0x4000;
static const unsigned FLAG_VIEW_WITH_CHECK = 0x8000;
static const unsigned FLAG_EXEC_BLOCK = 0x010000;

static const unsigned MAX_NESTING = 512;

Expand Down Expand Up @@ -105,7 +109,7 @@ class DsqlCompilerScratch : public BlrDebugWriter

protected:
// DsqlCompilerScratch should never be destroyed using delete.
// It dies together with it's pool in release_request().
// It dies together with it's pool.
~DsqlCompilerScratch()
{
}
Expand Down Expand Up @@ -317,6 +321,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
DsqlCompilerScratch* mainScratch = nullptr;
Firebird::NonPooledMap<USHORT, USHORT> outerMessagesMap; // <outer, inner>
Firebird::NonPooledMap<USHORT, USHORT> outerVarsMap; // <outer, inner>
dsql_msg* recordKeyMessage = nullptr; // Side message for positioned DML

private:
Firebird::HalfStaticArray<SelectExprNode*, 4> ctes; // common table expressions
Expand Down
114 changes: 94 additions & 20 deletions src/dsql/DsqlCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "../dsql/dsql_proto.h"
#include "../dsql/DsqlCursor.h"
#include "../dsql/StmtNodes.h"

using namespace Firebird;
using namespace Jrd;
Expand All @@ -36,10 +37,10 @@ static const char* const SCRATCH = "fb_cursor_";
static const ULONG PREFETCH_SIZE = 65536; // 64 KB

DsqlCursor::DsqlCursor(DsqlDmlRequest* req, ULONG flags)
: m_dsqlRequest(req), m_message(req->getDsqlStatement()->getReceiveMsg()),
m_resultSet(NULL), m_flags(flags),
m_space(req->getPool(), SCRATCH),
m_state(BOS), m_eof(false), m_position(0), m_cachedCount(0)
: m_dsqlRequest(req),
m_message(req->getDsqlStatement()->getReceiveMsg()->msg_number),
m_flags(flags),
m_space(req->getPool(), SCRATCH)
{
TRA_link_cursor(m_dsqlRequest->req_transaction, this);
}
Expand All @@ -48,6 +49,8 @@ DsqlCursor::~DsqlCursor()
{
if (m_resultSet)
m_resultSet->resetHandle();

delete[] m_keyBuffer;
}

jrd_tra* DsqlCursor::getTransaction() const
Expand All @@ -66,6 +69,31 @@ void DsqlCursor::setInterfacePtr(JResultSet* interfacePtr) noexcept
m_resultSet = interfacePtr;
}

bool DsqlCursor::getCurrentRecordKey(USHORT context, RecordKey& key) const
{
if (m_keyBuffer == nullptr)
{
// A possible situation for a cursor not based on any record source such as
// a = 1;
// suspend;
return false;
}

if (context * sizeof(RecordKey) >= m_keyBufferLength)
{
fb_assert(false);
return false;
}

if (m_state != POSITIONED)
{
return false;
}

key = m_keyBuffer[context];
return key.recordNumber.bid_relation_id != 0;
}

void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)
{
if (!cursor)
Expand All @@ -88,7 +116,7 @@ void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)

if (dsqlRequest->req_traced && TraceManager::need_dsql_free(attachment))
{
TraceSQLStatementImpl stmt(dsqlRequest, NULL);
TraceSQLStatementImpl stmt(dsqlRequest, nullptr, nullptr);
TraceManager::event_dsql_free(attachment, &stmt, DSQL_close);
}

Expand All @@ -115,6 +143,17 @@ int DsqlCursor::fetchNext(thread_db* tdbb, UCHAR* buffer)
return 1;
}

if (m_keyBufferLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
if (m_keyBufferLength > 0)
m_keyBuffer = FB_NEW_POOL(m_dsqlRequest->getPool()) RecordKey[req->req_rpb.getCount()];
}

if (m_keyBufferLength > 0)
m_dsqlRequest->gatherRecordKey(m_keyBuffer);

m_state = POSITIONED;
return 0;
}
Expand Down Expand Up @@ -163,7 +202,7 @@ int DsqlCursor::fetchAbsolute(thread_db* tdbb, UCHAR* buffer, SLONG position)
{
if (!m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, buffer);
fb_assert(m_eof);
}

Expand Down Expand Up @@ -248,7 +287,7 @@ void DsqlCursor::getInfo(thread_db* tdbb,
case IResultSet::INF_RECORD_COUNT:
if (isScrollable && !m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, nullptr);
fb_assert(m_eof);
}
response.insertInt(tag, isScrollable ? m_cachedCount : -1);
Expand Down Expand Up @@ -291,48 +330,83 @@ int DsqlCursor::fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 positio
{
if (position >= m_cachedCount)
{
if (m_eof || !cacheInput(tdbb, position))
if (m_eof || !cacheInput(tdbb, buffer, position))
{
m_state = EOS;
return 1;
}
}

fb_assert(position < m_cachedCount);
fb_assert(m_messageLength > 0); // At this point m_messageLength must be set by cacheInput

UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];
FB_UINT64 offset = position * (m_messageLength + m_keyBufferLength);
FB_UINT64 readBytes = m_space.read(offset, buffer, m_messageLength);

const FB_UINT64 offset = position * m_message->msg_length;
const FB_UINT64 readBytes = m_space.read(offset, msgBuffer, m_message->msg_length);
fb_assert(readBytes == m_message->msg_length);
if (m_keyBufferLength > 0)
{
offset += m_messageLength;
readBytes += m_space.read(offset, m_keyBuffer, m_keyBufferLength);
}

m_dsqlRequest->mapInOut(tdbb, true, m_message, NULL, buffer);
fb_assert(readBytes == m_messageLength + m_keyBufferLength);

m_position = position;
m_state = POSITIONED;
return 0;
}

bool DsqlCursor::cacheInput(thread_db* tdbb, FB_UINT64 position)
bool DsqlCursor::cacheInput(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position)
{
fb_assert(!m_eof);

const ULONG prefetchCount = MAX(PREFETCH_SIZE / m_message->msg_length, 1);
const UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];
// It could not be done before: user buffer length may be unknown until call setDelayedOutputMetadata()
if (m_messageLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
const MessageNode* msg = req->getStatement()->getMessage(m_message);
m_messageLength = msg->getFormat(req)->fmt_length;
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
if (m_keyBufferLength > 0)
{
// Save record key unconditionally because setCursorName() can be called after openCursor()
m_keyBuffer = FB_NEW_POOL(m_dsqlRequest->getPool()) RecordKey[req->req_rpb.getCount()];
}
}

std::unique_ptr<UCHAR[]> ownBuffer;
if (buffer == nullptr)
{
// We are called from getInfo() and there is no user-provided buffer for data.
// Create a temporary one.
// This code cannot be moved into getInfo() itself because it is most likely called before fetch()
// so m_messageLength is still unknown there.
ownBuffer.reset(buffer = FB_NEW UCHAR[m_messageLength]);
}

const ULONG prefetchCount = MAX(PREFETCH_SIZE / (m_messageLength + m_keyBufferLength), 1);

while (position >= m_cachedCount)
{
for (ULONG count = 0; count < prefetchCount; count++)
{
if (!m_dsqlRequest->fetch(tdbb, NULL))
if (!m_dsqlRequest->fetch(tdbb, buffer))
{
m_eof = true;
break;
}

const FB_UINT64 offset = m_cachedCount * m_message->msg_length;
const FB_UINT64 writtenBytes = m_space.write(offset, msgBuffer, m_message->msg_length);
fb_assert(writtenBytes == m_message->msg_length);
FB_UINT64 offset = m_cachedCount * (m_messageLength + m_keyBufferLength);
FB_UINT64 writtenBytes = m_space.write(offset, buffer, m_messageLength);

if (m_keyBufferLength > 0)
{
offset += m_messageLength;
m_dsqlRequest->gatherRecordKey(m_keyBuffer);
writtenBytes += m_space.write(offset, m_keyBuffer, m_keyBufferLength);
}

fb_assert(writtenBytes == m_messageLength + m_keyBufferLength);
m_cachedCount++;
}

Expand Down
Loading
Loading