Skip to content

RecordsetEnumerating / RecordsetEnumerated Session events #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/DataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,37 @@ namespace Xtensive.Orm.Providers
/// over either regular <see cref="IEnumerable{T}"/> of <see cref="Tuple"/>s
/// or over the running <see cref="Command"/> instance.
/// </summary>
public interface DataReader : IEnumerator<Tuple>, IAsyncEnumerator<Tuple>
public abstract class DataReader : IEnumerator<Tuple>, IAsyncEnumerator<Tuple>
{
bool IsInMemory { get; }
Tuple Current { get; }
public abstract bool MoveNext();
public abstract ValueTask<bool> MoveNextAsync();
public abstract void Reset();
public abstract void Dispose();
public abstract ValueTask DisposeAsync();

public abstract Tuple Current { get; }
object IEnumerator.Current => Current;

public abstract bool IsInMemory { get; }
}

internal sealed class InMemoryDataReader(IEnumerable<Tuple> tuples) : DataReader
{
private readonly IEnumerator<Tuple> source = tuples.GetEnumerator();

public bool IsInMemory => true;

public Tuple Current => source.Current;
public override bool IsInMemory => true;

object IEnumerator.Current => Current;
public override Tuple Current => source.Current;

public bool MoveNext() => source.MoveNext();
public override bool MoveNext() => source.MoveNext();

public void Reset() => source.Reset();
public override void Reset() => source.Reset();

public void Dispose() => source.Dispose();
public override void Dispose() => source.Dispose();

public async ValueTask DisposeAsync() => await ((IAsyncEnumerator<Tuple>) source).DisposeAsync().ConfigureAwaitFalse();
public override async ValueTask DisposeAsync() => await ((IAsyncEnumerator<Tuple>) source).DisposeAsync().ConfigureAwaitFalse();

public ValueTask<bool> MoveNextAsync() => ValueTask.FromResult(MoveNext());
public override ValueTask<bool> MoveNextAsync() => ValueTask.FromResult(MoveNext());
}

internal sealed class CommandDataReader(Command command, DbDataReaderAccessor accessor, CancellationToken token) : DataReader
Expand All @@ -50,16 +56,13 @@ internal sealed class CommandDataReader(Command command, DbDataReaderAccessor ac
/// Indicates current <see cref="DataReader"/> is built
/// over <see cref="IEnumerable{T}"/> of <see cref="Tuple"/>s data source.
/// </summary>
public bool IsInMemory => false;
public override bool IsInMemory => false;

/// <inheritdoc cref="IEnumerator{T}.Current"/>
public Tuple Current => command.ReadTupleWith(accessor);

/// <inheritdoc/>
object IEnumerator.Current => Current;
public override Tuple Current => command.ReadTupleWith(accessor);

/// <inheritdoc/>
public bool MoveNext()
public override bool MoveNext()
{
if (command.NextRow()) {
return true;
Expand All @@ -71,7 +74,7 @@ public bool MoveNext()
}

/// <inheritdoc/>
public async ValueTask<bool> MoveNextAsync()
public override async ValueTask<bool> MoveNextAsync()
{
if (await command.NextRowAsync(token).ConfigureAwaitFalse()) {
return true;
Expand All @@ -83,12 +86,12 @@ public async ValueTask<bool> MoveNextAsync()
}

/// <inheritdoc/>
public void Reset() => throw new NotSupportedException("Multiple enumeration is not supported.");
public override void Reset() => throw new NotSupportedException("Multiple enumeration is not supported.");

/// <inheritdoc/>
public void Dispose() => command.Dispose();
public override void Dispose() => command.Dispose();

/// <inheritdoc/>
public async ValueTask DisposeAsync() => await command.DisposeAsync().ConfigureAwaitFalse();
public override async ValueTask DisposeAsync() => await command.DisposeAsync().ConfigureAwaitFalse();
}
}
}
26 changes: 10 additions & 16 deletions Orm/Xtensive.Orm/Orm/Providers/EnumerationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ namespace Xtensive.Orm.Providers
/// </summary>
public sealed class EnumerationContext : Rse.Providers.EnumerationContext
{
private class EnumerationFinalizer : ICompletableScope
private class EnumerationFinalizer(EnumerationContext context, Queue<Action> finalizationQueue, TransactionScope transactionScope, SessionEventAccessor events)
: ICompletableScope
{
private readonly Queue<Action> finalizationQueue;
private readonly TransactionScope transactionScope;

public void Complete()
{
if (IsCompleted)
Expand All @@ -36,16 +34,11 @@ public void Complete()

public void Dispose()
{
while (finalizationQueue.TryDequeue(out var materializeSelf)) {
while (finalizationQueue?.TryDequeue(out var materializeSelf) == true) {
materializeSelf.Invoke();
}
transactionScope.DisposeSafely();
}

public EnumerationFinalizer(Queue<Action> finalizationQueue, TransactionScope transactionScope)
{
this.finalizationQueue = finalizationQueue;
this.transactionScope = transactionScope;
transactionScope?.Dispose();
events.NotifyRecordsetEnumerated(context);
}
}

Expand All @@ -56,7 +49,7 @@ public EnumerationFinalizer(Queue<Action> finalizationQueue, TransactionScope tr
/// Gets the session handler.
/// </summary>
/// <value>The session handler.</value>
public Session Session { get; private set; }
public Session Session { get; }

/// <inheritdoc/>
protected override EnumerationContextOptions Options { get { return options; } }
Expand All @@ -71,9 +64,10 @@ public override ICompletableScope BeginEnumeration()
var tx = Session.OpenAutoTransaction();
if (!Session.Configuration.Supports(SessionOptions.NonTransactionalReads))
Session.DemandTransaction();
if (MaterializationContext!=null && MaterializationContext.MaterializationQueue!=null)
return new EnumerationFinalizer(MaterializationContext.MaterializationQueue, tx);
return tx;
var events = Session.Events;
return events.NotifyRecordsetEnumerating(this) || MaterializationContext?.MaterializationQueue != null
? new EnumerationFinalizer(this, MaterializationContext?.MaterializationQueue, tx, events)
: tx;
}

// Constructors
Expand Down
28 changes: 27 additions & 1 deletion Orm/Xtensive.Orm/Orm/SessionEventAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Linq.Expressions;
using Xtensive.Core;
using Xtensive.Orm.Model;
using Xtensive.Orm.Providers;

namespace Xtensive.Orm
{
Expand Down Expand Up @@ -244,6 +245,16 @@ public sealed class SessionEventAccessor
/// </summary>
public event EventHandler<TransactionEventArgs> TransactionRollbacked;

/// <summary>
/// Occurs when Recordset enumeration begins
/// </summary>
public event EventHandler<EnumerationContext> RecordsetEnumerating;

/// <summary>
/// Occurs when Recordset enumeration ends
/// </summary>
public event EventHandler<EnumerationContext> RecordsetEnumerated;

#endregion

#region NotifyXxx methods
Expand Down Expand Up @@ -497,6 +508,21 @@ internal void NotifyTransactionRollbacked(Transaction transaction)
TransactionRollbacked(this, new TransactionEventArgs(transaction));
}

internal bool NotifyRecordsetEnumerating(EnumerationContext context)
{
if (RecordsetEnumerating != null && AreNotificationsEnabled()) {
RecordsetEnumerating(this, context);
return true;
}
return false;
}

internal void NotifyRecordsetEnumerated(EnumerationContext context)
{
if (RecordsetEnumerated != null && AreNotificationsEnabled())
RecordsetEnumerated(this, context);
}

#endregion

private bool AreNotificationsEnabled()
Expand All @@ -513,4 +539,4 @@ internal SessionEventAccessor(Session session, bool systemEvents)
SystemEvents = systemEvents;
}
}
}
}
Loading