-
Notifications
You must be signed in to change notification settings - Fork 84
ES|QL support #194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ES|QL support #194
Changes from 10 commits
a64a6b1
e841008
12aedc1
6e9f6a5
e147efd
c7c2989
a6d3817
1491576
d5773ea
ba00f03
0bb1d6b
9284cba
ea7a449
59fc863
de7560b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,9 @@ | |
|
||
class LogStash::Filters::Elasticsearch < LogStash::Filters::Base | ||
|
||
require 'logstash/filters/elasticsearch/dsl_executor' | ||
require 'logstash/filters/elasticsearch/esql_executor' | ||
|
||
include LogStash::PluginMixins::ECSCompatibilitySupport | ||
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck | ||
|
||
|
@@ -24,8 +27,10 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base | |
# Field substitution (e.g. `index-name-%{date_field}`) is available | ||
config :index, :validate => :string, :default => "" | ||
|
||
# Elasticsearch query string. Read the Elasticsearch query string documentation. | ||
# for more info at: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-query-string-query.html#query-string-syntax | ||
# Elasticsearch query string. This can be in DSL or ES|QL query shape. | ||
# Read the Elasticsearch query string documentation. | ||
# DSL: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-query-string-query.html#query-string-syntax | ||
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html | ||
config :query, :validate => :string | ||
|
||
# File path to elasticsearch query in DSL format. Read the Elasticsearch query documentation | ||
|
@@ -134,6 +139,14 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base | |
# What status codes to retry on? | ||
config :retry_on_status, :validate => :number, :list => true, :default => [500, 502, 503, 504] | ||
|
||
# params to send to ES|QL query, naming params preferred | ||
# example, | ||
# if query is "FROM my-index | WHERE some_type = ?type" | ||
# named params can be applied as following via query_params: | ||
# query_params => { | ||
# "named_params" => [ {"type" => "%{[type]}"}] | ||
# } | ||
config :query_params, :validate => :hash, :default => {} | ||
mashhurs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
config :ssl, :obsolete => "Set 'ssl_enabled' instead." | ||
config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead." | ||
|
@@ -146,6 +159,9 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base | |
include MonitorMixin | ||
attr_reader :shared_client | ||
|
||
LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8 | ||
ES_ESQL_SUPPORT_VERSION = "8.11.0" | ||
|
||
## | ||
# @override to handle proxy => '' as if none was set | ||
# @param value [Array<Object>] | ||
|
@@ -163,17 +179,23 @@ def self.validate_value(value, validator) | |
return super(value, :uri) | ||
end | ||
|
||
attr_reader :query_dsl | ||
|
||
def register | ||
#Load query if it exists | ||
if @query_template | ||
if File.zero?(@query_template) | ||
raise "template is empty" | ||
end | ||
file = File.open(@query_template, 'r') | ||
@query_dsl = file.read | ||
query_type = resolve_query_type | ||
case query_type | ||
when "esql" | ||
invalid_params_with_esql = original_params.keys & %w(index query_template sort docinfo_fields aggregation_fields enable_sort result_size) | ||
raise LogStash::ConfigurationError, "Configured #{invalid_params_with_esql} params cannot be used with ES|QL query" if invalid_params_with_esql.any? | ||
mashhurs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
validate_ls_version_for_esql_support! | ||
validate_esql_query_and_params! | ||
@esql_executor ||= LogStash::Filters::Elasticsearch::EsqlExecutor.new(self, @logger) | ||
else # dsl | ||
validate_dsl_query_settings! | ||
@esql_executor ||= LogStash::Filters::Elasticsearch::DslExecutor.new(self, @logger) | ||
end | ||
|
||
validate_query_settings | ||
fill_hosts_from_cloud_id | ||
setup_ssl_params! | ||
validate_authentication | ||
|
@@ -182,77 +204,22 @@ def register | |
@hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s | ||
|
||
test_connection! | ||
validate_es_for_esql_support! if query_type == "esql" | ||
setup_serverless | ||
if get_client.es_transport_client_type == "elasticsearch_transport" | ||
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore" | ||
end | ||
end # def register | ||
|
||
def filter(event) | ||
matched = false | ||
begin | ||
params = { :index => event.sprintf(@index) } | ||
|
||
if @query_dsl | ||
query = LogStash::Json.load(event.sprintf(@query_dsl)) | ||
params[:body] = query | ||
else | ||
query = event.sprintf(@query) | ||
params[:q] = query | ||
params[:size] = result_size | ||
params[:sort] = @sort if @enable_sort | ||
end | ||
|
||
@logger.debug("Querying elasticsearch for lookup", :params => params) | ||
|
||
results = get_client.search(params) | ||
raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures" | ||
|
||
event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits'])) | ||
|
||
resultsHits = results["hits"]["hits"] | ||
if !resultsHits.nil? && !resultsHits.empty? | ||
matched = true | ||
@fields.each do |old_key, new_key| | ||
old_key_path = extract_path(old_key) | ||
extracted_hit_values = resultsHits.map do |doc| | ||
extract_value(doc["_source"], old_key_path) | ||
end | ||
value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first | ||
set_to_event_target(event, new_key, value_to_set) | ||
end | ||
@docinfo_fields.each do |old_key, new_key| | ||
old_key_path = extract_path(old_key) | ||
extracted_docs_info = resultsHits.map do |doc| | ||
extract_value(doc, old_key_path) | ||
end | ||
value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first | ||
set_to_event_target(event, new_key, value_to_set) | ||
end | ||
end | ||
|
||
resultsAggs = results["aggregations"] | ||
if !resultsAggs.nil? && !resultsAggs.empty? | ||
matched = true | ||
@aggregation_fields.each do |agg_name, ls_field| | ||
set_to_event_target(event, ls_field, resultsAggs[agg_name]) | ||
end | ||
end | ||
|
||
rescue => e | ||
if @logger.trace? | ||
@logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace) | ||
elsif @logger.debug? | ||
@logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace) | ||
else | ||
@logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message) | ||
end | ||
@tag_on_failure.each{|tag| event.tag(tag)} | ||
else | ||
filter_matched(event) if matched | ||
end | ||
@esql_executor.process(get_client, event) | ||
end # def filter | ||
|
||
def decorate(event) | ||
# Elasticsearch class has an access for `filter_matched` | ||
filter_matched(event) | ||
end | ||
|
||
# public only to be reuse in testing | ||
def prepare_user_agent | ||
os_name = java.lang.System.getProperty('os.name') | ||
|
@@ -268,18 +235,6 @@ def prepare_user_agent | |
|
||
private | ||
|
||
# if @target is defined, creates a nested structure to inject result into target field | ||
# if not defined, directly sets to the top-level event field | ||
# @param event [LogStash::Event] | ||
# @param new_key [String] name of the field to set | ||
# @param value_to_set [Array] values to set | ||
# @return [void] | ||
def set_to_event_target(event, new_key, value_to_set) | ||
key_to_set = target ? "[#{target}][#{new_key}]" : new_key | ||
|
||
event.set(key_to_set, value_to_set) | ||
end | ||
|
||
def client_options | ||
@client_options ||= { | ||
:user => @user, | ||
|
@@ -376,53 +331,10 @@ def get_client | |
end | ||
end | ||
|
||
# get an array of path elements from a path reference | ||
def extract_path(path_reference) | ||
return [path_reference] unless path_reference.start_with?('[') && path_reference.end_with?(']') | ||
|
||
path_reference[1...-1].split('][') | ||
end | ||
|
||
# given a Hash and an array of path fragments, returns the value at the path | ||
# @param source [Hash{String=>Object}] | ||
# @param path [Array{String}] | ||
# @return [Object] | ||
def extract_value(source, path) | ||
path.reduce(source) do |memo, old_key_fragment| | ||
break unless memo.include?(old_key_fragment) | ||
memo[old_key_fragment] | ||
end | ||
end | ||
|
||
# Given a "hits" object from an Elasticsearch response, return the total number of hits in | ||
# the result set. | ||
# @param hits [Hash{String=>Object}] | ||
# @return [Integer] | ||
def extract_total_from_hits(hits) | ||
total = hits['total'] | ||
|
||
# Elasticsearch 7.x produces an object containing `value` and `relation` in order | ||
# to enable unambiguous reporting when the total is only a lower bound; if we get | ||
# an object back, return its `value`. | ||
return total['value'] if total.kind_of?(Hash) | ||
|
||
total | ||
end | ||
|
||
def hosts_default?(hosts) | ||
hosts.is_a?(Array) && hosts.size == 1 && !original_params.key?('hosts') | ||
end | ||
|
||
def validate_query_settings | ||
unless @query || @query_template | ||
raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`." | ||
end | ||
|
||
if @query && @query_template | ||
raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`." | ||
end | ||
end | ||
|
||
def validate_authentication | ||
authn_options = 0 | ||
authn_options += 1 if @cloud_auth | ||
|
@@ -514,4 +426,62 @@ def setup_ssl_params! | |
params['ssl_enabled'] = @ssl_enabled ||= Array(@hosts).all? { |host| host && host.to_s.start_with?("https") } | ||
end | ||
|
||
def resolve_query_type | ||
@query&.strip&.match?(/\A(?:FROM|ROW|SHOW)/) ? "esql": "dsl" | ||
end | ||
|
||
def validate_dsl_query_settings! | ||
#Load query if it exists | ||
if @query_template | ||
if File.zero?(@query_template) | ||
raise "template is empty" | ||
end | ||
file = File.open(@query_template, 'r') | ||
@query_dsl = file.read | ||
end | ||
|
||
validate_query_settings | ||
end | ||
|
||
def validate_query_settings | ||
unless @query || @query_template | ||
raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`." | ||
end | ||
|
||
if @query && @query_template | ||
raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`." | ||
end | ||
end | ||
|
||
def validate_ls_version_for_esql_support! | ||
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION) | ||
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}") | ||
end | ||
end | ||
|
||
def validate_esql_query_and_params! | ||
accepted_query_params = %w(named_params) | ||
original_query_params = original_params["query_params"] ||= {} | ||
invalid_query_params = original_query_params.keys - accepted_query_params | ||
raise LogStash::ConfigurationError, "#{accepted_query_params} option(s) accepted in `query_params`, but found #{invalid_query_params} invalid option(s)" if invalid_query_params.any? | ||
|
||
is_named_params_array = original_query_params["named_params"] ? original_query_params["named_params"].class.eql?(Array) : true | ||
raise LogStash::ConfigurationError, "`query_params => named_params` is required to be array" unless is_named_params_array | ||
|
||
named_params = original_query_params["named_params"] ||= [] | ||
named_params_keys = named_params.map(&:keys).flatten | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nesting aside, we should accept either of:
# normalize @query_params to a flat hash
if @query_params.kind_of?(Array)
illegal_entries = @query_params.reject {|e| e.kind_of_?(Hash) }
raise LogStash::ConfigurationError, "Illegal Placeholder Structure in `query_params`: #{illegal_entries}" if illegal_entries.any?
@query_params = @query_params.reduce(&:merge)
end
illegal_keys = @query_params.keys.reject {|k| k[/^[a-z_][a-z0-9_]*$/] }
raise LogStash::ConfigurationError, "Illegal Placeholder Names in `query_params`: #{illegal_keys}" if illegal_keys.any?
illegal_values = @query_params.reject {|k,v| v.kind_of?(String) }.keys
raise LogStash::ConfigurationError, "Illegal Placeholder Values in `query_params`: #{illegal_values}" if illegal_values.any? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any ways we can accept both
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, when setting config with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given there's no backwards compatibility concerns here as the query dsl doesn't use "query_params" , should we just call it esql_params and make it an array to make it the most similar possible with the api? making it an hash removes the ability of using positional parameters (e.g. [1, "hello", true] as $1, $2, $3). In ES|QL it is called "params", so it's not unreasonable to call it esql_params There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we have top-level
|
||
|
||
placeholders = @query.scan(/(?<=[?])[a-z_][a-z0-9_]*/i) | ||
placeholders.each do |placeholder| | ||
raise LogStash::ConfigurationError, "Placeholder #{placeholder} not found in query" unless named_params_keys.include?(placeholder) | ||
end | ||
end | ||
|
||
def validate_es_for_esql_support! | ||
# make sure connected ES supports ES|QL (8.11+) | ||
@es_version ||= get_client.es_version | ||
es_supports_esql = Gem::Version.create(@es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION) | ||
fail("Connected Elasticsearch #{@es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql | ||
end | ||
|
||
end #class LogStash::Filters::Elasticsearch |
Uh oh!
There was an error while loading. Please reload this page.