Skip to content

Commit d5773ea

Browse files
mashhursyaauie
andcommitted
Rebase against upstream main after target support added. Separate unit test for DSL. Address comments: do not save ES version in client, add apply target method in executors, set to target if target is defined, docs update.
Co-authored-by: Rye Biesemeyer <[email protected]>
1 parent 1491576 commit d5773ea

File tree

8 files changed

+444
-398
lines changed

8 files changed

+444
-398
lines changed

docs/index.asciidoc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,11 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`.
375375
* Value type is <<string,string>>
376376
* There is no default value for this setting.
377377

378-
Elasticsearch query string. More information is available in the
379-
{ref}/query-dsl-query-string-query.html#query-string-syntax[Elasticsearch query
380-
string documentation].
381-
Use either `query` or `query_template`.
378+
The query to be executed.
379+
Accepted query shape is DSL query string or ES|QL.
380+
For the DSL query string, use either `query` or `query_template`.
381+
Read the {ref}/query-dsl-query-string-query.html[{es} query
382+
string documentation] or {ref}/esql.html[{es} ES|QL documentation] for more information.
382383

383384
[id="plugins-{type}s-{plugin}-query_params"]
384385
===== `query_params`
@@ -392,7 +393,6 @@ Accepted options:
392393
|`named_params` |[] | List of named parameters and their matches used in the `query`
393394
|===
394395

395-
396396
[id="plugins-{type}s-{plugin}-query_template"]
397397
===== `query_template`
398398

lib/logstash/filters/elasticsearch.rb

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -235,18 +235,6 @@ def prepare_user_agent
235235

236236
private
237237

238-
# if @target is defined, creates a nested structure to inject result into target field
239-
# if not defined, directly sets to the top-level event field
240-
# @param event [LogStash::Event]
241-
# @param new_key [String] name of the field to set
242-
# @param value_to_set [Array] values to set
243-
# @return [void]
244-
def set_to_event_target(event, new_key, value_to_set)
245-
key_to_set = target ? "[#{target}][#{new_key}]" : new_key
246-
247-
event.set(key_to_set, value_to_set)
248-
end
249-
250238
def client_options
251239
@client_options ||= {
252240
:user => @user,
@@ -438,9 +426,9 @@ def setup_ssl_params!
438426
params['ssl_enabled'] = @ssl_enabled ||= Array(@hosts).all? { |host| host && host.to_s.start_with?("https") }
439427
end
440428

441-
def resolve_query_type
442-
@query&.strip&.match?(/\A(?:FROM|ROW|SHOW)/) ? "esql": "dsl"
443-
end
429+
def resolve_query_type
430+
@query&.strip&.match?(/\A(?:FROM|ROW|SHOW)/) ? "esql": "dsl"
431+
end
444432

445433
def validate_dsl_query_settings!
446434
#Load query if it exists
@@ -484,17 +472,16 @@ def validate_esql_query_and_params!
484472
named_params_keys = named_params.map(&:keys).flatten
485473

486474
placeholders = @query.scan(/(?<=[?])[a-z_][a-z0-9_]*/i)
487-
raise LogStash::ConfigurationError, "Number of placeholders in `query` and `named_params` do not match" unless placeholders.size == named_params_keys.size
488-
489475
placeholders.each do |placeholder|
490476
raise LogStash::ConfigurationError, "Placeholder #{placeholder} not found in query" unless named_params_keys.include?(placeholder)
491477
end
492478
end
493479

494480
def validate_es_for_esql_support!
495481
# make sure connected ES supports ES|QL (8.11+)
496-
es_supports_esql = Gem::Version.create(get_client.es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
497-
fail("Connected Elasticsearch #{get_client.es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
482+
@es_version ||= get_client.es_version
483+
es_supports_esql = Gem::Version.create(@es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
484+
fail("Connected Elasticsearch #{@es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
498485
end
499486

500487
end #class LogStash::Filters::Elasticsearch

lib/logstash/filters/elasticsearch/client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def info
6868
end
6969

7070
def es_version
71-
@es_version ||= info&.dig('version', 'number')
71+
info&.dig('version', 'number')
7272
end
7373

7474
def build_flavor

lib/logstash/filters/elasticsearch/dsl_executor.rb

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ module Filters
55
class Elasticsearch
66
class DslExecutor
77
def initialize(plugin, logger)
8-
@plugin = plugin
98
@index = plugin.params["index"]
109
@query = plugin.params["query"]
1110
@query_dsl = plugin.query_dsl
@@ -17,6 +16,13 @@ def initialize(plugin, logger)
1716
@sort = plugin.params["sort"]
1817
@aggregation_fields = plugin.params["aggregation_fields"]
1918
@logger = logger
19+
@event_decorator = plugin.method(:decorate)
20+
@target_field = plugin.params["target"]
21+
if @target_field
22+
def self.apply_target(path); "[#{@target_field}][#{path}]"; end
23+
else
24+
def self.apply_target(path); path; end
25+
end
2026
end
2127

2228
def process(client, event)
@@ -46,25 +52,27 @@ def process(client, event)
4652
matched = true
4753
@fields.each do |old_key, new_key|
4854
old_key_path = extract_path(old_key)
49-
set = result_hits.map do |doc|
55+
extracted_hit_values = result_hits.map do |doc|
5056
extract_value(doc["_source"], old_key_path)
5157
end
52-
event.set(new_key, set.count > 1 ? set : set.first)
58+
value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first
59+
set_to_event_target(event, new_key, value_to_set)
5360
end
5461
@docinfo_fields.each do |old_key, new_key|
5562
old_key_path = extract_path(old_key)
56-
set = result_hits.map do |doc|
63+
extracted_docs_info = result_hits.map do |doc|
5764
extract_value(doc, old_key_path)
5865
end
59-
event.set(new_key, set.count > 1 ? set : set.first)
66+
value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first
67+
set_to_event_target(event, new_key, value_to_set)
6068
end
6169
end
6270

6371
result_aggregations = results["aggregations"]
6472
if !result_aggregations.nil? && !result_aggregations.empty?
6573
matched = true
6674
@aggregation_fields.each do |agg_name, ls_field|
67-
event.set(ls_field, result_aggregations[agg_name])
75+
set_to_event_target(event, ls_field, result_aggregations[agg_name])
6876
end
6977
end
7078

@@ -78,7 +86,7 @@ def process(client, event)
7886
end
7987
@tag_on_failure.each { |tag| event.tag(tag) }
8088
else
81-
@plugin.decorate(event) if matched
89+
@event_decorator.call(event) if matched
8290
end
8391
end
8492

@@ -116,6 +124,16 @@ def extract_value(source, path)
116124
end
117125
end
118126

127+
# if @target is defined, creates a nested structure to inject result into target field
128+
# if not defined, directly sets to the top-level event field
129+
# @param event [LogStash::Event]
130+
# @param new_key [String] name of the field to set
131+
# @param value_to_set [Array] values to set
132+
# @return [void]
133+
def set_to_event_target(event, new_key, value_to_set)
134+
key_to_set = self.apply_target(new_key)
135+
event.set(key_to_set, value_to_set)
136+
end
119137
end
120138
end
121139
end

lib/logstash/filters/elasticsearch/esql_executor.rb

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ class Elasticsearch
66
class EsqlExecutor
77

88
def initialize(plugin, logger)
9-
@plugin = plugin
109
@logger = logger
1110

11+
@event_decorator = plugin.method(:decorate)
1212
@query = plugin.params["query"]
1313
if @query.strip.start_with?("FROM") && !@query.match?(/\|\s*LIMIT/)
1414
@logger.warn("ES|QL query doesn't contain LIMIT, adding `| LIMIT 1` to optimize the performance")
@@ -20,14 +20,21 @@ def initialize(plugin, logger)
2020
@fields = plugin.params["fields"]
2121
@tag_on_failure = plugin.params["tag_on_failure"]
2222
@logger.debug("ES|QL query executor initialized with ", query: @query, named_params: @named_params)
23+
24+
@target_field = plugin.params["target"]
25+
if @target_field
26+
def self.apply_target(path); "[#{@target_field}][#{path}]"; end
27+
else
28+
def self.apply_target(path); path; end
29+
end
2330
end
2431

2532
def process(client, event)
2633
resolved_params = @named_params&.any? ? resolve_parameters(event) : []
2734
response = execute_query(client, resolved_params)
2835
inform_warning(response)
2936
process_response(event, response)
30-
@plugin.decorate(event)
37+
@event_decorator.call(event)
3138
rescue => e
3239
@logger.error("Failed to process ES|QL filter", exception: e)
3340
@tag_on_failure.each { |tag| event.tag(tag) }
@@ -54,7 +61,7 @@ def resolve_parameters(event)
5461
def execute_query(client, params)
5562
# debug logs may help to check what query shape the plugin is sending to ES
5663
@logger.debug("Executing ES|QL query", query: @query, params: params)
57-
client.esql_query({ body: { query: @query, params: params }, format: 'json', drop_null_columns: true }, 'esql')
64+
client.esql_query({ body: { query: @query, params: params }, format: 'json', drop_null_columns: true })
5865
end
5966

6067
def process_response(event, response)
@@ -88,11 +95,22 @@ def add_requested_fields(event, columns, values)
8895
column_index = columns.find_index { |col| col['name'] == old_key }
8996
next unless column_index
9097

91-
row_values = values[column_index]&.compact # remove non-exist field values with compact
92-
# TODO: set to the target field once target support is added
93-
event.set(new_key, row_values.one? ? row_values.first : row_values) if row_values&.size > 0
98+
row_values = values.map { |entry| entry[column_index] }&.compact
99+
value_to_set = row_values.count > 1 ? row_values : row_values.first
100+
set_to_event_target(event, new_key, value_to_set) unless value_to_set.nil?
94101
end
95102
end
103+
104+
# if @target is defined, creates a nested structure to inject result into target field
105+
# if not defined, directly sets to the top-level event field
106+
# @param event [LogStash::Event]
107+
# @param new_key [String] name of the field to set
108+
# @param value_to_set [Array] values to set
109+
# @return [void]
110+
def set_to_event_target(event, new_key, value_to_set)
111+
key_to_set = self.apply_target(new_key)
112+
event.set(key_to_set, value_to_set)
113+
end
96114
end
97115
end
98116
end

0 commit comments

Comments
 (0)