Skip to content

Commit 6408af6

Browse files
committed
CSHARP-994: fix thread starvation in Mapper
1 parent aa4f36e commit 6408af6

File tree

12 files changed

+264
-31
lines changed

12 files changed

+264
-31
lines changed

src/Cassandra/Cassandra.csproj

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
<VersionPrefix>3.22.0</VersionPrefix>
99
<IncludeSourceRevisionInInformationalVersion>false</IncludeSourceRevisionInInformationalVersion>
1010
<Authors>DataStax</Authors>
11-
<TargetFrameworks Condition="'$(BuildCoreOnly)' != 'True'">net452;netstandard2.0</TargetFrameworks>
12-
<TargetFramework Condition="'$(BuildCoreOnly)' == 'True'">netstandard2.0</TargetFramework>
11+
<TargetFrameworks Condition="'$(BuildCoreOnly)' != 'True'">net452;netstandard2.0;net8.0</TargetFrameworks>
12+
<TargetFrameworks Condition="'$(BuildCoreOnly)' == 'True'">netstandard2.0;net8.0</TargetFrameworks>
1313
<NoWarn>$(NoWarn);1591</NoWarn>
1414
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
1515
<WarningsNotAsErrors>NU1901;NU1902;NU1903;NU1904</WarningsNotAsErrors>
@@ -24,16 +24,17 @@
2424
<PackageLicenseFile>LICENSE.md</PackageLicenseFile>
2525
<RepositoryUrl>https://github.com/datastax/csharp-driver</RepositoryUrl>
2626
<PackageProjectUrl>https://github.com/datastax/csharp-driver</PackageProjectUrl>
27-
<LangVersion>7.1</LangVersion>
28-
<NuGetAudit>false</NuGetAudit>
27+
<LangVersion Condition="$(TargetFramework) != 'net8.0'">7.1</LangVersion>
28+
<LangVersion Condition="$(TargetFramework) == 'net8.0'">12.0</LangVersion>
29+
<NuGetAudit>false</NuGetAudit>
2930
</PropertyGroup>
3031
<PropertyGroup Condition="$([System.Text.RegularExpressions.Regex]::IsMatch('$(TargetFramework)', '^net4\d'))">
3132
<DefineConstants>$(DefineConstants);NETFRAMEWORK</DefineConstants>
3233
</PropertyGroup>
3334
<PropertyGroup Condition="$([System.Text.RegularExpressions.Regex]::IsMatch('$(TargetFramework)', '^net\d$'))">
3435
<DefineConstants>$(DefineConstants);NETCOREAPP</DefineConstants>
3536
</PropertyGroup>
36-
37+
3738
<ItemGroup>
3839
<None Include="..\..\LICENSE.md" Pack="true" PackagePath="LICENSE.md" />
3940
</ItemGroup>
@@ -44,6 +45,8 @@
4445
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.0.0" />
4546
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" NoWarn="NU1903" />
4647
<PackageReference Include="System.Management" Version="4.7.0" />
48+
</ItemGroup>
49+
<ItemGroup Condition="'$(TargetFramework)' != 'net8.0'">
4750
<PackageReference Include="System.Runtime.InteropServices.RuntimeInformation" Version="4.0.0" />
4851
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.6.0" />
4952
</ItemGroup>

src/Cassandra/Data/Linq/ClientProjectionCqlQuery.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ internal override IEnumerable<TResult> AdaptResult(string cql, RowSet rs)
5454
if (!_canCompile)
5555
{
5656
var mapper = MapperFactory.GetMapperWithProjection<TResult>(cql, rs, _projectionExpression);
57-
result = rs.Select(mapper);
57+
result = Enumerable.Select(rs, mapper);
5858
}
5959
else
6060
{

src/Cassandra/Data/Linq/CqlConditionalCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ protected internal override string GetCql(out object[] values)
7676
Cql.New(cql, values).WithExecutionProfile(executionProfile)).ConfigureAwait(false);
7777
this.CopyQueryPropertiesTo(stmt);
7878
var rs = await session.ExecuteAsync(stmt, executionProfile).ConfigureAwait(false);
79-
return AppliedInfo<TEntity>.FromRowSet(_mapperFactory, cql, rs);
79+
return await AppliedInfo<TEntity>.FromRowSetAsync(_mapperFactory, cql, rs);
8080
}
8181

8282
/// <summary>

src/Cassandra/Data/Linq/CqlQuery.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,12 @@ public async Task<IPage<TEntity>> ExecutePagedAsync(string executionProfile)
134134
var cql = visitor.GetSelect(Expression, out object[] values);
135135
var rs = await InternalExecuteWithProfileAsync(executionProfile, cql, values).ConfigureAwait(false);
136136
var mapper = MapperFactory.GetMapper<TEntity>(cql, rs);
137-
return new Page<TEntity>(rs.Select(mapper), PagingState, rs.PagingState);
137+
#if NET8_0_OR_GREATER
138+
var items = await AsyncEnumerable.Select(rs, mapper).ToListAsync();
139+
#else
140+
var items = Enumerable.Select(rs, mapper).ToList();
141+
#endif
142+
return new Page<TEntity>(items, PagingState, rs.PagingState);
138143
}
139144

140145
/// <summary>

src/Cassandra/Data/Linq/CqlQueryBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ protected async Task<RowSet> InternalExecuteWithProfileAsync(string executionPro
123123
internal virtual IEnumerable<TEntity> AdaptResult(string cql, RowSet rs)
124124
{
125125
var mapper = MapperFactory.GetMapper<TEntity>(cql, rs);
126-
return rs.Select(mapper);
126+
return Enumerable.Select(rs ,mapper);
127127
}
128128

129129
/// <summary>

src/Cassandra/DataStax/Graph/GraphResultSet.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public IEnumerator<GraphNode> GetEnumerator()
7575
/// </summary>
7676
private IEnumerable<GraphNode> YieldNodes()
7777
{
78-
foreach (var node in _rs.Select(_factory))
78+
foreach (var node in Enumerable.Select(_rs, _factory))
7979
{
8080
for (var i = 0; i < node.Bulk; i++)
8181
{
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright (C) DataStax Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#if NET8_0_OR_GREATER && !NET10_OR_GREATER // These methods are implemented in .NET 10.
16+
using System.Collections.Generic;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
20+
// ReSharper disable once CheckNamespace
21+
namespace System.Linq;
22+
23+
internal static class AsyncEnumerable
24+
{
25+
public static async ValueTask<TSource> FirstAsync<TSource>(
26+
this IAsyncEnumerable<TSource> source,
27+
CancellationToken cancellationToken = default)
28+
{
29+
await using var e = source.GetAsyncEnumerator(cancellationToken);
30+
31+
if (!await e.MoveNextAsync())
32+
{
33+
throw new InvalidOperationException("Sequence contains no elements");
34+
}
35+
36+
return e.Current;
37+
}
38+
39+
public static async ValueTask<TSource> FirstOrDefaultAsync<TSource>(
40+
this IAsyncEnumerable<TSource> source,
41+
CancellationToken cancellationToken = default)
42+
{
43+
await using var e = source.GetAsyncEnumerator(cancellationToken);
44+
return await e.MoveNextAsync() ? e.Current : default;
45+
}
46+
47+
public static async ValueTask<TSource> SingleAsync<TSource>(
48+
this IAsyncEnumerable<TSource> source,
49+
CancellationToken cancellationToken = default)
50+
{
51+
await using var e = source.GetAsyncEnumerator(cancellationToken);
52+
53+
if (!await e.MoveNextAsync())
54+
{
55+
throw new InvalidOperationException("Sequence contains no elements");
56+
}
57+
58+
TSource result = e.Current;
59+
if (await e.MoveNextAsync())
60+
{
61+
throw new InvalidOperationException("Sequence contains more than one element");
62+
}
63+
64+
return result;
65+
}
66+
67+
public static async ValueTask<TSource> SingleOrDefaultAsync<TSource>(
68+
this IAsyncEnumerable<TSource> source,
69+
CancellationToken cancellationToken = default)
70+
{
71+
await using var e = source.GetAsyncEnumerator(cancellationToken);
72+
73+
if (!await e.MoveNextAsync())
74+
{
75+
return default;
76+
}
77+
78+
TSource result = e.Current;
79+
if (await e.MoveNextAsync())
80+
{
81+
throw new InvalidOperationException("Sequence contains more than one element");
82+
}
83+
84+
return result;
85+
}
86+
87+
public static async IAsyncEnumerable<TResult> Select<TSource, TResult>(
88+
this IAsyncEnumerable<TSource> source,
89+
Func<TSource, TResult> selector)
90+
{
91+
await foreach (TSource element in source)
92+
{
93+
yield return selector(element);
94+
}
95+
}
96+
97+
public static async ValueTask<List<TSource>> ToListAsync<TSource>(
98+
this IAsyncEnumerable<TSource> source,
99+
CancellationToken cancellationToken = default)
100+
{
101+
List<TSource> list = [];
102+
await foreach (TSource element in source.WithCancellation(cancellationToken))
103+
{
104+
list.Add(element);
105+
}
106+
107+
return list;
108+
}
109+
}
110+
#endif

src/Cassandra/Mapping/AppliedInfo.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
//
1616

1717
using System.Linq;
18+
using System.Threading.Tasks;
19+
20+
#if !NET8_0_OR_GREATER
21+
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
22+
#endif
1823

1924
namespace Cassandra.Mapping
2025
{
@@ -54,9 +59,13 @@ public AppliedInfo(T existing)
5459
/// <summary>
5560
/// Adapts a LWT RowSet and returns a new AppliedInfo
5661
/// </summary>
57-
internal static AppliedInfo<T> FromRowSet(MapperFactory mapperFactory, string cql, RowSet rs)
62+
internal static async Task<AppliedInfo<T>> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs)
5863
{
64+
#if NET8_0_OR_GREATER
65+
var row = await rs.FirstOrDefaultAsync();
66+
#else
5967
var row = rs.FirstOrDefault();
68+
#endif
6069
const string appliedColumn = "[applied]";
6170
if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue<bool>(appliedColumn))
6271
{

src/Cassandra/Mapping/ICqlQueryAsyncClient.cs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
using System.Collections.Generic;
1818
using System.Threading.Tasks;
1919

20+
#if !NET8_0_OR_GREATER
21+
#pragma warning disable CS1574, CS1584, CS1581, CS1580 // Cannot resolve reference in XML comment
22+
#endif
23+
2024
namespace Cassandra.Mapping
2125
{
2226
/// <summary>
@@ -25,20 +29,43 @@ namespace Cassandra.Mapping
2529
public interface ICqlQueryAsyncClient
2630
{
2731
/// <summary>
28-
/// Gets a list of all T from Cassandra.
32+
/// Gets a list of all T from Cassandra. Loading new pages when enumerating the result may block the thread. For that reason,
33+
/// <see cref="FetchAsAsyncEnumerable{T}(CqlQueryOptions)"/> should be preferred.
2934
/// </summary>
3035
Task<IEnumerable<T>> FetchAsync<T>(CqlQueryOptions queryOptions = null);
31-
36+
3237
/// <summary>
33-
/// Gets a list of T from Cassandra using the CQL statement and parameter values specified.
38+
/// Gets a list of T from Cassandra using the CQL statement and parameter values specified. Loading new pages when enumerating the result may
39+
/// block the thread. For that reason, <see cref="FetchAsAsyncEnumerable{T}(string, object[])"/> should be preferred.
3440
/// </summary>
3541
Task<IEnumerable<T>> FetchAsync<T>(string cql, params object[] args);
3642

3743
/// <summary>
38-
/// Gets a list of T from Cassandra using the CQL statement specified.
44+
/// Gets a list of T from Cassandra using the CQL statement specified. Loading new pages when enumerating the result may block the thread.
45+
/// For that reason, <see cref="FetchAsAsyncEnumerable{T}(Cql)"/> should be preferred.
3946
/// </summary>
4047
Task<IEnumerable<T>> FetchAsync<T>(Cql cql);
4148

49+
#if NET8_0_OR_GREATER
50+
/// <summary>
51+
/// Gets an <see cref="IAsyncEnumerable{T}"/> of all T from Cassandra. Unlike <see cref="FetchAsync{T}(CqlQueryOptions)"/>, loading new
52+
/// pages when enumerating the result does not block the thread.
53+
/// </summary>
54+
IAsyncEnumerable<T> FetchAsAsyncEnumerable<T>(CqlQueryOptions options = null);
55+
56+
/// <summary>
57+
/// Gets an <see cref="IAsyncEnumerable{T}"/> of all T from Cassandra using the CQL statement and parameter values specified. Unlike
58+
/// <see cref="FetchAsync{T}(string, object[])"/>, loading new pages when enumerating the result does not block the thread.
59+
/// </summary>
60+
IAsyncEnumerable<T> FetchAsAsyncEnumerable<T>(string cql, params object[] args);
61+
62+
/// <summary>
63+
/// Gets an <see cref="IAsyncEnumerable{T}"/> of all T from Cassandra using the CQL statement specified. Unlike
64+
/// <see cref="FetchAsync{T}(Cql)"/>, loading new pages when enumerating the result does not block the thread.
65+
/// </summary>
66+
IAsyncEnumerable<T> FetchAsAsyncEnumerable<T>(Cql cql);
67+
#endif
68+
4269
/// <summary>
4370
/// Gets a paged list of T results from Cassandra.
4471
/// Suitable for manually page through all the results of a query.

0 commit comments

Comments
 (0)