Skip to content

ES|QL support #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open

ES|QL support #194

wants to merge 15 commits into from

Conversation

mashhurs
Copy link
Contributor

@mashhurs mashhurs commented Apr 17, 2025

Description

Design note: https://docs.google.com/document/d/1EZMe0OU7Ceyg9aUawARh2N2iFq196mVTWxut2abGIEY

  • Needs team's input to make decision on params, namings and etc...

PR changes:

  • ESQL and DSL executors are introduced.
  • query param can accept ES|QL query shape now.
  • esql_params is introduced for initial step but needs team's feedback.
  • DSL logics moved into DSL executors.

Sample minimal config:

elasticsearch {
      cloud_id => "my-cloud-id"
      api_key => "api:key"
      query => "FROM significant_month | WHERE type == ?type"
      esql_params => [{"type" => "%{[type]}"}]
      fields => { "@timestamp" => "timestamp_new" }
   }

Author's checklist

  • A decision on introduced params - need team feedback
  • Add validation logic similar to es-input
  • Unit tests
  • Integration tests
  • Documentation
  • Version bump (plan for both 8.x and 9.x)

Log

[2025-04-17T16:47:56,901][WARN ][logstash.filters.elasticsearch][main][0d04e11c949906b1abe5db3c4991ecafac9bc4925a31d477fe5427ec1369304a] ES|QL query execution warning:  {:message=>"299 Elasticsearch-8.17.0-2b6a7fed44faa321997703718f07ee0420804b41 \"No limit defined, adding default limit of [1000]\""}
{
               "dmin" => 2.187,
          "magSource" => "us",
         "depthError" => 1.697,
           "latitude" => -6.3171,
               "type" => "earthquake",
            "magType" => "mww",
                "nst" => 324,
                "mag" => 6.9,
           "magError" => 0.028,
    "horizontalError" => 6.62,
                "gap" => 11,
      "timestamp_new" => [
        [ 0] "2025-04-04T20:04:38.266Z",
        [ 1] "2025-04-03T14:09:29.757Z",
        [ 2] "2025-04-02T00:29:22.130Z",
        [ 3] "2025-03-30T12:18:47.830Z",
        [ 4] "2025-03-28T17:17:27.821Z",
        [ 5] "2025-03-28T06:32:04.726Z",
        [ 6] "2025-03-28T06:20:52.684Z",
        [ 7] "2025-03-25T01:43:11.796Z",
        [ 8] "2025-03-21T14:50:51.098Z",
        [ 9] "2025-03-18T02:46:33.330Z",
        [10] "2025-03-17T03:17:21.650Z",
        [11] "2025-03-10T02:33:14.609Z"
    ],
           "@version" => "1",
                "rms" => 0.78,
              "place" => "181 km ESE of Kimbe, Papua New Guinea",
                 "id" => "us6000q41n",
                "net" => "us",
          "longitude" => 151.5922,
          "@metadata" => {
        "total_hits" => 12
    },
     "locationSource" => "us",
         "@timestamp" => 2025-04-04T20:04:38.266Z,
              "depth" => 10.0,
           "location" => "POINT (151.5922 -6.3171)",
             "magNst" => 121,
               "time" => "2025-04-04T20:04:38.266Z",
            "updated" => "2025-04-08T19:27:52.892Z",
             "status" => "reviewed"
}

module LogStash
module Filters
class Elasticsearch
class DslExecutor
Copy link
Contributor Author

@mashhurs mashhurs Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just copied and pasted DSL logics from elasticsearch.rb without changing (only namings are standardized). This class may be simplified/cleanified for better maintenance. - I will follow up with a separate task later, focusing on ES|QL now.

Copy link
Contributor

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, this is a fantastic start, and the work to break out the separate executors for the separate query types is valuable.

I have a couple issues to address, and have expanded on them more in the line comments near where I discovered them.

  1. query vs esql_query: it turns out we cannot reliably distinguish between a QueryString query and an ESQL query, so we need to either require a side-hint (like query_type => esql) or to accept our ESQL query via a separate parameter (like esql_query, which I prefer)
  2. Using fields mapping is awkward with ESQL, since there is no way for us to grab whole objects with subfields like we can with document-oriented queries, and ESQL already provides a way to KEEP and RENAME to get the desired structure; I would prefer we map the dot-syntax fields from the event into a nested structure without needing a user to tell us about each and every field to map in the result.

end

def add_requested_fields(event, columns, values)
@fields.each do |old_key, new_key|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of usability, we have an opportunity for to improve the currently-cumbersome nature of this plugin's requirement that a user provides a query and map the fields and docinfo_fields.

With ES|QL, the user already has the opportunity to specify exactly which fields to KEEP in the result, and can RENAME fields to change the shape. They shouldn't also need to tell us how to map those onto the event.

This is especially true because the semantics of fields will differ significantly from the DSL (where it can find objects that include subfields) to ESQL (where it can find fields by name).

I would prefer to store all of the fields returned by the ESQL query, without requiring an additional mapping to be provided, by expanding the dot-notation field addresses in the result set into field references as previously shared in a prototype, and prohibiting the use of fields when invoking an ESQL query.

This would have a side-effect bonus of allowing users to reduce their bandwidth costs when they want fewer fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with your point here. However, it is also same with DSL. Similar to KEEP, the DSL can also apply fields to specify which fields are going to be kept. I am also really curious how fields param was introduced in es-filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it is also same with DSL

When using this plugin with the DSL, the only way to map fields from the result onto the event is to provide a mapping of all of the fields and docinfo_fields to dig out of the response. In a way these map to the DSL options fields and _source (even if they are effectively client-side emulations).

As such I don't think we need to port-forward their awkwardness into the ES|QL queries where specifying what you want is part of the ES|QL query. We can simply map the response directly to the event, which encourages the user to formulate a query that contains what they want and no more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using this plugin with the DSL, the only way to map fields from the result onto the event is to provide a mapping of all of the fields and docinfo_fields ...

Thank you for the explanation - this is I didn't know. But taking a look sources that seems feasible to copy results["hits"]["hits"]&.each do |doc| doc[_source] # interest region to the Event...
Anyway this is what it is!


This is tough one....
When copying resultset to the Event:

  • if target is not defined, conflict may happen if same field already exist. What would be our expectation for this case?
    • I don't think overwriting (with changing the field to array) origin data would be good approach.
    • it seems either the plugin has to require target or embrace the data loss on conflicted fields (warn and guide setting target when happens).
  • if target is defined, we convert the columnar resultset into array of nested JSON structure, basically each row becomes JSON nested target entry object - as we did same in es-input. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree field selection works differently in these two query languages and that we should leverage the abilty to decise what to KEEP in the query itself.

as for what to do in conflicts, I'm also not a fan of changing the field to array (even though we do that in many places), but I'd prefer to overwrite existing data vs dropping new data.

it seems either the plugin has to require target or embrace the data loss on conflicted fields (warn and guide setting target when happens).

Beware of per-event WARN logs, as they can be annoying to deal with (too much duplication and disk volume).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT for the following approach:

  • when target is not specified
    • if LIMIT 1, we will able to set a single row directly to the event (see conflict case below);
    • and if not LIMIT 1, that means we will able to set only single row to the event without changing the structure and ignore rest. Ignore rest means, we encourage setting target to get the full resultset (we can check both target and limit at initialization, not for each upstream event)
      • to illustrate with an example, for the {foo:1, bar:2} event with cols[a,b,c], rows[[3,4,5], [6,7,8]], the output would be {foo:1, bar:2, a:3, b:4, c:5}
  • if conflict we overwrite the field - regardless of target, conflict still may happen. We warn (at executor initialization and document) that existing field gets overwritten if conflicts.
  • when target is specified,
    • if single row retrieved, directly set to target as a nested structure;
    • if multi rows retrieved, the target becomes an array to hold multiple entries;
  • similar to es-input, we detect sub-fields and set top-level value;

Other idea is to define reserved field (e.g. values) but it is kind of enforcing to set the target by default.
Looking forward to any feedback!

mashhurs and others added 9 commits May 8, 2025 16:01
…hape now. is introduced for initial step but needs team's feedback. DSL logics moved into DSL executors.
… columns support, documentation is added, basic integration tests are added.
Accept obvious or agreed code review suggestions.

Co-authored-by: Rye Biesemeyer <[email protected]>
Separate DSL and ESQL interface in the client.

Co-authored-by: Rye Biesemeyer <[email protected]>
…t test for DSL. Address comments: do not save ES version in client, add apply target method in executors, set to target if target is defined, docs update.

Co-authored-by: Rye Biesemeyer <[email protected]>
@mashhurs mashhurs requested a review from yaauie May 9, 2025 06:59
@mashhurs mashhurs requested a review from jsvd May 22, 2025 18:04
Comment on lines 471 to 472
named_params = original_query_params["named_params"] ||= []
named_params_keys = named_params.map(&:keys).flatten
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nesting aside, we should accept either of:

  • an array of single-entry hashes (to mirror the expectations from the ES API):
    query_params => [
       {"page_count" => "[page_count]"},
       {"author" => "[author]"},
       {"count" => "[count]"}
     ]
    
  • a hash with one or more values (to be logstash-y)
    query_params => {
       page_count => "[page_count]"
       author     => "[author]"
       count      => "[count]"
     }
    
# normalize @query_params to a flat hash
if @query_params.kind_of?(Array)
  illegal_entries = @query_params.reject {|e| e.kind_of_?(Hash) }
  raise LogStash::ConfigurationError, "Illegal Placeholder Structure in `query_params`: #{illegal_entries}" if illegal_entries.any?
  @query_params = @query_params.reduce(&:merge)
end

illegal_keys = @query_params.keys.reject {|k| k[/^[a-z_][a-z0-9_]*$/] }
raise LogStash::ConfigurationError, "Illegal Placeholder Names in `query_params`: #{illegal_keys}" if illegal_keys.any?
illegal_values = @query_params.reject {|k,v| v.kind_of?(String) }.keys
raise LogStash::ConfigurationError, "Illegal Placeholder Values in `query_params`: #{illegal_values}" if illegal_values.any?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any ways we can accept both Array and Hash types on query_params?
When we define the config, for this case config :query_params, :validate => :hash, :default => {}, as my understanding we can apply only one type (Hash for this case). I am a favor of using Hash to keep a consistency with other config params and also if we configure Array whose entries are Hash, we need to make sure the Hash is single sized (validation against single size sounds weird to me).
I have just tried the config where defined as Hash but I provided Array with Hash entries and got error.
Let me please now if I am missing anything here.

  filter {
    elasticsearch {
      # This setting must be a hash
      # This field must contain an even number of items, got 1
      query_params => [{"type"=>"[type]"}]
      ...
    }
  }
[2025-05-26T16:45:26,272][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ConfigurationError) Something is wrong with your configuration.", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:137)", "org.logstash.execution.AbstractPipelineExt.initialize(AbstractPipelineExt.java:236)", "org.logstash.execution.AbstractPipelineExt$INVOKER$i$initialize.call(AbstractPipelineExt$INVOKER$i$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:847)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1379)", "org.jruby.ir.instructions.InstanceSuperInstr.interpret(InstanceSuperInstr.java:139)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:363)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.RubyClass.newInstance(RubyClass.java:949)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.ir.instructions.CallBase.interpret(CallBase.java:548)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:363)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:88)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:238)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:225)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:228)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:476)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:293)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:324)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:144)", "org.jruby.RubyProc.call(RubyProc.java:354)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)", "java.base/java.lang.Thread.run(Thread.java:1583)"], :cause=>{:exception=>Java::OrgJrubyExceptions::Exception, :message=>"(ConfigurationError) Something is wrong with your configuration.", :backtrace=>["RUBY.config_init(/Users/mashhur/Dev/elastic/logstash/logstash-core/lib/logstash/config/mixin.rb:111)", "RUBY.initialize(/Users/mashhur/Dev/elastic/logstash/logstash-core/lib/logstash/filters/base.rb:141)", "RUBY.initialize(/Users/mashhur/Dev/elastic/logstash/vendor/jruby/lib/ruby/stdlib/monitor.rb:229)", "org.logstash.plugins.factory.ContextualizerExt.initialize(org/logstash/plugins/factory/ContextualizerExt.java:97)", "org.jruby.RubyClass.new(org/jruby/RubyClass.java:949)", "org.logstash.plugins.factory.ContextualizerExt.initialize_plugin(org/logstash/plugins/factory/ContextualizerExt.java:80)", "org.logstash.plugins.factory.ContextualizerExt.initialize_plugin(org/logstash/plugins/factory/ContextualizerExt.java:53)", "org.logstash.plugins.factory.PluginFactoryExt.filter_delegator(org/logstash/plugins/factory/PluginFactoryExt.java:73)", "org.logstash.plugins.factory.PluginFactoryExt.plugin(org/logstash/plugins/factory/PluginFactoryExt.java:250)", "org.logstash.execution.AbstractPipelineExt.initialize(org/logstash/execution/AbstractPipelineExt.java:236)", "RUBY.initialize(/Users/mashhur/Dev/elastic/logstash/logstash-core/lib/logstash/java_pipeline.rb:47)", "org.jruby.RubyClass.new(org/jruby/RubyClass.java:949)", "RUBY.execute(/Users/mashhur/Dev/elastic/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50)", "RUBY.converge_state(/Users/mashhur/Dev/elastic/logstash/logstash-core/lib/logstash/agent.rb:420)"]}}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, when setting config with :array and applying Hash or Array seems worked but it is due to config mixin doesn't validate Array and uses as it is? - source: https://github.com/elastic/logstash/blob/main/logstash-core/lib/logstash/config/mixin.rb#L476.
I am assuming it is intentional and makes sense to support both Array and Hash query_params shape, applied (with unit tests make sure to keep safe) in this commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given there's no backwards compatibility concerns here as the query dsl doesn't use "query_params" , should we just call it esql_params and make it an array to make it the most similar possible with the api? making it an hash removes the ability of using positional parameters (e.g. [1, "hello", true] as $1, $2, $3). In ES|QL it is called "params", so it's not unreasonable to call it esql_params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have top-level query_type, I think not attaching esql to query_params makes sense to me. I mean:

  • when query_type => "esql", query_params belongs to ES|QL, like esql -> query_params;
  • when query_type => "dsl", the query_params is not allowed since query_template includes the query_params;

Comment on lines +14 to +15
@logger.warn("ES|QL query doesn't contain LIMIT, adding `| LIMIT 1` to optimize the performance")
@query.concat(' | LIMIT 1')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Modifying the provided query implicitly feels gross to me. Is it necessary?

  • What does opting out of the limit look like? Setting LIMIT to 10k+?
  • We also have logic to condense a results set down by "column" into a single value (an order-undefined collection that does not contain nulls)

This is also limited to FROM queries, but other queries can have multiple rows in their responses too.

Suggested change
@logger.warn("ES|QL query doesn't contain LIMIT, adding `| LIMIT 1` to optimize the performance")
@query.concat(' | LIMIT 1')
@logger.warn("ES|QL query doesn't contain LIMIT; results will be condensed to an array of values per column.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We came to this conclusion in Apr. 30 meeting (decisions section of notes).

We have total_values in the event metadata highlights the result size for the given query on the processed event and if user needs all of them, they need to set the LIMIT in the query.

What does opting out of the limit look like? Setting LIMIT to 10k+?

By default ES sets to 1K, the upper limit can be up to 10K

This is also limited to FROM queries, but other queries can have multiple rows in their responses too.

Ah, good catch! Sorry I don't know why I included but all processing commands support LIMIT. This shouldn't be limited to FROM.
I have removed the FROM limitation logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah seems like we clashed here in terms of what we want to provide. my suggestion was to focus on single row return queries, and the user should get that even if they forgot as it would be likely that injecting an array of values into fields instead of a single value would break their downstream data mappings.

Either way I'm not too attached to either format, as this feature should be out as technical preview, we're not locked into any of these decisions, I'm mostly looking for feedback on their use and subsequent inconveniences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...injecting an array of values into fields instead of a single value would break their downstream data mappings.

This is true when migrating from current DSL to ES|QL. And addition to mapping, the 1k default limit for ES|QL is relatively big comparing to DSL (defaults to 10). And this may lead ES cluster to get a high load (traffic, search, etc...).
Since ES|QL simplifies DSL queries alot, I am expecting we might have users start moving to ES|QL from DSL for long term.

I'm mostly looking for feedback on their use and subsequent inconveniences.

Same opinion!

...as this feature should be out as technical preview...

Yup, I am also fine with either option and it is subject to change.

@mashhurs mashhurs requested a review from yaauie May 27, 2025 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants