Skip to content

Add OnBeforeRequest callback #8541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions docs/reference/transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mapped_pages:
- https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/transport.html
---

# Transport example [transport]
# Low level Transport example [low-level-transport]

This page demonstrates how to use the low level transport to send requests.

Expand All @@ -16,8 +16,10 @@ public class MyRequestParameters : RequestParameters
init => Q("pretty", value);
}
}
```

// ...
```csharp
using Elastic.Transport;

var body = """
{
Expand Down Expand Up @@ -49,3 +51,85 @@ var response = await client.Transport
.ConfigureAwait(false);
```

# `OnBeforeRequest` example [on-before-request]

The `OnBeforeRequest` callback in `IElasticsearchClientSettings` can be used to dynamically modify requests.

```csharp
var settings = new ElasticsearchClientSettings(new Uri("http://localhost:9200))
.OnBeforeRequest(OnBeforeRequest); <1>

RequestConfiguration? globalRequestConfiguration = null;
ConditionalWeakTable<RequestConfiguration, RequestConfiguration>? globalRequestConfigurations = null;

void OnBeforeRequest(ElasticsearchClient client, Request request, EndpointPath endpointPath, ref PostData? postData, ref IRequestConfiguration? requestConfiguration)
{
// Each time a request is made, the transport creates a new `BoundConfiguration` for every `IRequestConfiguration`
// that is not in the cache (based on reference equality).

// To prevent frequent allocations of our mutated request configurations (and the secondary allocations for
// `BoundConfiguration`), we have to maintain a custom cache that maps every original request configuration to the
// mutated one.

if (requestConfiguration is null)
{
globalRequestConfiguration = Interlocked.CompareExchange(
ref globalRequestConfiguration,
new RequestConfiguration
{
UserAgent = UserAgent.Create("my-custom-user-agent")
},
null) ?? globalRequestConfiguration;

requestConfiguration = globalRequestConfiguration;
return;
}

if (requestConfiguration is not RequestConfiguration rc)
{
// Only `RequestConfiguration` (not all implementations of `IRequestConfiguration`) gets cached in the
// internal cache.
requestConfiguration = MutateRequestConfiguration(requestConfiguration);
return;
}

// ReSharper disable InconsistentlySynchronizedField

var cache = (Interlocked.CompareExchange(
ref globalRequestConfigurations,
new ConditionalWeakTable<RequestConfiguration, RequestConfiguration>(),
null
) ?? globalRequestConfigurations);

if (cache.TryGetValue(rc, out var mutatedRequestConfiguration))
{
requestConfiguration = mutatedRequestConfiguration;
return;
}

mutatedRequestConfiguration = MutateRequestConfiguration(rc);

#if NET8_0_OR_GREATER
cache.TryAdd(rc, mutatedRequestConfiguration);
#else
lock (cache)
{
cache.Add(rc, mutatedRequestConfiguration);
}
#endif

// ReSharper restore InconsistentlySynchronizedField

return;

RequestConfiguration MutateRequestConfiguration(IRequestConfiguration requestConfiguration)
{
return new RequestConfiguration(requestConfiguration)
{
UserAgent = UserAgent.Create("my-custom-user-agent")
};
}
}
```

1. Register the `OnBeforeRequest` callback.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Elastic.Transport" Version="0.8.0" />
<PackageReference Include="Elastic.Transport" Version="0.8.1" />
<PackageReference Include="PolySharp" Version="1.15.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading;
using Elastic.Transport;
using Elastic.Transport.Diagnostics;

using Elastic.Clients.Elasticsearch.Requests;

Expand All @@ -28,18 +25,23 @@ public partial class ElasticsearchClient
private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0";

private readonly ITransport<IElasticsearchClientSettings> _transport;
internal static ConditionalWeakTable<JsonSerializerOptions, IElasticsearchClientSettings> SettingsTable { get; } = new();

/// <summary>
/// Creates a client configured to connect to http://localhost:9200.
/// </summary>
public ElasticsearchClient() : this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) { }
public ElasticsearchClient() :
this(new ElasticsearchClientSettings(new Uri("http://localhost:9200")))
{
}

/// <summary>
/// Creates a client configured to connect to a node reachable at the provided <paramref name="uri" />.
/// </summary>
/// <param name="uri">The <see cref="Uri" /> to connect to.</param>
public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) { }
public ElasticsearchClient(Uri uri) :
this(new ElasticsearchClientSettings(uri))
{
}

/// <summary>
/// Creates a client configured to communicate with Elastic Cloud using the provided <paramref name="cloudId" />.
Expand All @@ -51,8 +53,8 @@ public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri))
/// </summary>
/// <param name="cloudId">The Cloud ID of an Elastic Cloud deployment.</param>
/// <param name="credentials">The credentials to use for the connection.</param>
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : this(
new ElasticsearchClientSettings(cloudId, credentials))
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) :
this(new ElasticsearchClientSettings(cloudId, credentials))
{
}

Expand All @@ -69,8 +71,7 @@ internal ElasticsearchClient(ITransport<IElasticsearchClientSettings> transport)
{
transport.ThrowIfNull(nameof(transport));
transport.Configuration.ThrowIfNull(nameof(transport.Configuration));
transport.Configuration.RequestResponseSerializer.ThrowIfNull(
nameof(transport.Configuration.RequestResponseSerializer));
transport.Configuration.RequestResponseSerializer.ThrowIfNull(nameof(transport.Configuration.RequestResponseSerializer));
transport.Configuration.Inferrer.ThrowIfNull(nameof(transport.Configuration.Inferrer));

_transport = transport;
Expand All @@ -96,47 +97,38 @@ private enum ProductCheckStatus

private partial void SetupNamespaces();

internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new() =>
DoRequest<TRequest, TResponse, TRequestParameters>(request, null);

internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
TRequest request,
Action<IRequestConfiguration>? forceConfiguration)
TRequest request)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request, forceConfiguration).EnsureCompleted();

internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
TRequest request,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, null, cancellationToken);
{
return DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request).EnsureCompleted();
}

internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
TRequest request,
Action<IRequestConfiguration>? forceConfiguration,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, forceConfiguration, cancellationToken).AsTask();
{
return DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, cancellationToken).AsTask();
}

private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(
bool isAsync,
TRequest request,
Action<IRequestConfiguration>? forceConfiguration,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
{
// The product check modifies request parameters and therefore must not be executed concurrently.
if (request is null)
{
throw new ArgumentNullException(nameof(request));
}

// We use a lockless CAS approach to make sure that only a single product check request is executed at a time.
// We do not guarantee that the product check is always performed on the first request.

Expand All @@ -157,12 +149,12 @@ private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestPar

ValueTask<TResponse> SendRequest()
{
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
PrepareRequest<TRequest, TRequestParameters>(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues);
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, routeValues);

return isAsync
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration, cancellationToken))
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration));
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfiguration, cancellationToken))
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfiguration));
}

async ValueTask<TResponse> SendRequestWithProductCheck()
Expand All @@ -178,34 +170,35 @@ async ValueTask<TResponse> SendRequestWithProductCheck()
// 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that
// no other thread executes the product check at the same time. Locked access is not required here.
if (_productCheckStatus is (int)ProductCheckStatus.InProgress)
{
_productCheckStatus = (int)ProductCheckStatus.NotChecked;
}

throw;
}
}

async ValueTask<TResponse> SendRequestWithProductCheckCore()
{
PrepareRequest<TRequest, TRequestParameters>(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues);
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, routeValues);

// Attach product check header

// TODO: The copy constructor should accept null values
var requestConfig = (request.RequestConfiguration is null)
? new RequestConfiguration()
var requestConfig = (requestConfiguration is null)
? new RequestConfiguration
{
ResponseHeadersToParse = new HeadersList("x-elastic-product")
}
: new RequestConfiguration(request.RequestConfiguration)
: new RequestConfiguration(requestConfiguration)
{
ResponseHeadersToParse = (request.RequestConfiguration.ResponseHeadersToParse is { Count: > 0 })
? new HeadersList(request.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product")
ResponseHeadersToParse = (requestConfiguration.ResponseHeadersToParse is { Count: > 0 })
? new HeadersList(requestConfiguration.ResponseHeadersToParse, "x-elastic-product")
: new HeadersList("x-elastic-product")
};

// Send request

var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);

TResponse response;

if (isAsync)
Expand Down Expand Up @@ -239,7 +232,9 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
: (int)ProductCheckStatus.Failed;

if (_productCheckStatus == (int)ProductCheckStatus.Failed)
{
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
}

return response;
}
Expand All @@ -249,15 +244,17 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
// If there are no subscribed listeners, we avoid some work and allocations
// If there are no subscribed listeners, we avoid some work and allocations.
if (!Elastic.Transport.Diagnostics.OpenTelemetry.ElasticTransportActivitySourceHasListeners)
{
return null;
}

return OpenTelemetryDataMutator;

void OpenTelemetryDataMutator(Activity activity)
{
// We fall back to a general operation name in cases where the derived request fails to override the property
// We fall back to a general operation name in cases where the derived request fails to override the property.
var operationName = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : request.HttpMethod.GetStringValue();

// TODO: Optimisation: We should consider caching these, either for cases where resolvedRouteValues is null, or
Expand All @@ -267,7 +264,7 @@ void OpenTelemetryDataMutator(Activity activity)
// The latter may bloat the cache as some combinations of path parts may rarely re-occur.

activity.DisplayName = operationName;

activity.SetTag(OpenTelemetry.SemanticConventions.DbOperation, !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : "unknown");
activity.SetTag($"{OpenTelemetrySpanAttributePrefix}schema_url", OpenTelemetrySchemaVersion);

Expand All @@ -282,21 +279,26 @@ void OpenTelemetryDataMutator(Activity activity)
}
}

private (EndpointPath endpointPath, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request)
private void PrepareRequest<TRequest, TRequestParameters>(
TRequest request,
out EndpointPath endpointPath,
out PostData? postData,
out IRequestConfiguration? requestConfiguration,
out Dictionary<string, string>? routeValues)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
request.ThrowIfNull(nameof(request), "A request is required.");

var (resolvedUrl, _, routeValues) = request.GetUrl(ElasticsearchClientSettings);
var (resolvedUrl, _, resolvedRouteValues) = request.GetUrl(ElasticsearchClientSettings);
var pathAndQuery = request.RequestParameters.CreatePathWithQueryStrings(resolvedUrl, ElasticsearchClientSettings);

var postData =
request.HttpMethod == HttpMethod.GET ||
request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody
routeValues = resolvedRouteValues;
endpointPath = new EndpointPath(request.HttpMethod, pathAndQuery);
postData =
request.HttpMethod is HttpMethod.GET or HttpMethod.HEAD || !request.SupportsBody
? null
: PostData.Serializable(request);

return (new EndpointPath(request.HttpMethod, pathAndQuery), routeValues, postData);
requestConfiguration = request.RequestConfiguration;
ElasticsearchClientSettings.OnBeforeRequest?.Invoke(this, request, endpointPath, ref postData, ref requestConfiguration);
}
}
Loading
Loading