Skip to content

Commit b18bfb6

Browse files
committed
normalise hosts to URI::Generics, which reliably preserve defaults
An upstream bug in the Elasticsearch Ruby Client's handling of `String` host arguments that begin with a schema (e.g., `https://localhost`) causes it to default to port 80 or 443, depending on the schema, instead of Elasticsearch's port 9200. Since the Elasticsearch Ruby Client will accept a `URI` in this case, and will correctly handle falling through to appropriate defaults, we normalise to `URI::Generic`, which does not have a default port. We absorb the `ssl => true` case into this normalisation, as its previous implementation prevented the use of non-default ports in the array provided to `hosts`. We also add support for IPv6 addresses, requiring a square-bracketed notation when used in conjunction with a specified port. (see: RFC-3986) Supersedes: #104 Resolves: #110 Resolves: #111
1 parent a8ae485 commit b18bfb6

File tree

4 files changed

+287
-12
lines changed

4 files changed

+287
-12
lines changed

docs/index.asciidoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,12 @@ fields => {
221221

222222
* Value type is <<array,array>>
223223
* Default value is `["localhost:9200"]`
224+
* Format for each entry is one of:
225+
- a valid RFC3986 URI with scheme, hostname, and optional port
226+
- an ipv4 address, optionally followed by a colon and port number
227+
- a hostname, optionally followed by a colon and port number
228+
- a square-bracketed ipv6 address, optionally followed by a colon and port number
229+
- a bare ipv6 address
224230

225231
List of elasticsearch hosts to use for querying.
226232

@@ -281,7 +287,9 @@ Comma-delimited list of `<field>:<direction>` pairs that define the sort order
281287
* Value type is <<boolean,boolean>>
282288
* Default value is `false`
283289

284-
SSL
290+
Force SSL/TLS secured communication to Elasticsearch cluster.
291+
Leaving this unspecified will use whatever scheme is specified in the URLs listed in <<plugins-{type}s-{plugin}-hosts>>, where mixed schemes are supported.
292+
If SSL is set to `true`, the plugin will refuse to start if any of the hosts specifies an `http://` scheme.
285293

286294
[id="plugins-{type}s-{plugin}-tag_on_failure"]
287295
===== `tag_on_failure`

lib/logstash/filters/elasticsearch.rb

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
require "logstash/json"
66
java_import "java.util.concurrent.ConcurrentHashMap"
77

8+
require 'resolv'
9+
810

911
class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
1012
config_name "elasticsearch"
@@ -71,6 +73,8 @@ def register
7173
@query_dsl = file.read
7274
end
7375

76+
@normalised_hosts = normalise_hosts(@hosts, @ssl)
77+
7478
test_connection!
7579
end # def register
7680

@@ -140,8 +144,7 @@ def filter(event)
140144
private
141145
def client_options
142146
{
143-
:ssl => @ssl,
144-
:hosts => @hosts,
147+
:hosts => @normalised_hosts,
145148
:ca_file => @ca_file,
146149
:logger => @logger
147150
}
@@ -191,4 +194,84 @@ def extract_total_from_hits(hits)
191194
def test_connection!
192195
get_client.client.ping
193196
end
197+
198+
private
199+
200+
PATTERN_START_WITH_URI_SCHEME =
201+
%r{\A[[:alpha:]][[:alnum:]\.\+\-]*://}i
202+
203+
PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT =
204+
%r{\A([^:\[\]]+|\[[^\]]+\])(?::([0-9]+))?\Z}
205+
206+
##
207+
# Map the provided array-of-strings to an array of `URI::Generic`
208+
# instances, which the Elasticsearch client can use to establish
209+
# connections.
210+
#
211+
# @param hosts [Array<String>]: (@see `#normalise_host`)
212+
# @param force_ssl [Boolean]: (@see `#normalise_host`)
213+
#
214+
# @return [Array<URI::Generic>]
215+
def normalise_hosts(hosts, force_ssl)
216+
hosts.map { |input| normalise_host(input, force_ssl) }
217+
end
218+
219+
##
220+
# Convert the provided string to a `URI::Generic` instance, which the
221+
# Elasticsearch client can use to establish connections.
222+
#
223+
# @param input [String]: a url, in one of the following formats:
224+
# - a qualified URL with schema, hostname, and
225+
# optional port
226+
# - a bare hostname or ip, optionally followed by a
227+
# colon and port number
228+
# - a square-bracketed ipv6 literal, optionally
229+
# followed by a colon and port number
230+
# - a bare ipv6-address
231+
# @param force_ssl [Boolean]: true to force SSL; will cause failure if one
232+
# or more hosts explicitly supplies non-SSL
233+
# scheme (e.g., `http`).
234+
#
235+
# @return [URI::Generic]
236+
def normalise_host(input, force_ssl)
237+
if force_ssl && input.start_with?('http://')
238+
logger.error("Plugin configured to force SSL with `ssl => true`, " +
239+
"but a host explicitly declared non-https URL `#{input}`")
240+
241+
raise LogStash::ConfigurationError, "Aborting due to conflicting configuration"
242+
end
243+
244+
begin
245+
if PATTERN_START_WITH_URI_SCHEME.match(input)
246+
# Avoid `URI::parse`, which routes to specific implementations
247+
# that inject defaults that do not make sense in this context.
248+
URI::Generic.new(*URI.split(input))
249+
else
250+
if PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT.match(input)
251+
host, port = Regexp.last_match.captures
252+
elsif input =~ Resolv::IPv6::Regex
253+
# per RFC3986: to be used as hostname in URIs, ipv6 literals
254+
# MUST be wrapped in square-brackets.
255+
host, port = "[#{input}]", nil
256+
else
257+
fail('unsupported format')
258+
end
259+
URI::Generic.new(
260+
force_ssl ? 'https' : 'http',
261+
nil, # userinfo,
262+
host,
263+
port,
264+
nil, # registry
265+
nil, # path
266+
nil, # opaque
267+
nil, # query
268+
nil # fragment
269+
)
270+
end
271+
rescue => e
272+
logger.error("Plugin configured with invalid host value `#{input}`",
273+
:exception => e.message, :class => e.class.name)
274+
raise LogStash::ConfigurationError, "Aborting due to invalid configuration"
275+
end
276+
end
194277
end #class LogStash::Filters::Elasticsearch

lib/logstash/filters/elasticsearch/client.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,15 @@ class ElasticsearchClient
1111
attr_reader :client
1212

1313
def initialize(user, password, options={})
14-
ssl = options.fetch(:ssl, false)
15-
hosts = options[:hosts]
16-
@logger = options[:logger]
14+
hosts = options.fetch(:hosts)
15+
@logger = options.fetch(:logger)
1716

1817
transport_options = {}
1918
if user && password
2019
token = ::Base64.strict_encode64("#{user}:#{password.value}")
2120
transport_options[:headers] = { Authorization: "Basic #{token}" }
2221
end
2322

24-
hosts.map! {|h| { host: h, scheme: 'https' } } if ssl
2523
# set ca_file even if ssl isn't on, since the host can be an https url
2624
ssl_options = { ssl: true, ca_file: options[:ca_file] } if options[:ca_file]
2725
ssl_options ||= {}

spec/filters/elasticsearch_spec.rb

Lines changed: 191 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,199 @@
88

99
context "registration" do
1010

11-
let(:plugin) { LogStash::Plugin.lookup("filter", "elasticsearch").new({}) }
12-
before do
13-
allow(plugin).to receive(:test_connection!)
11+
let(:plugin_class) { LogStash::Plugin.lookup("filter", "elasticsearch") }
12+
let(:plugin) { plugin_class.new(config) }
13+
let(:config) { Hash.new }
14+
15+
context 'with defaults' do
16+
before do
17+
allow(plugin).to receive(:test_connection!)
18+
end
19+
20+
it "should not raise an exception" do
21+
expect {plugin.register}.to_not raise_error
22+
end
1423
end
1524

16-
it "should not raise an exception" do
17-
expect {plugin.register}.to_not raise_error
25+
context 'hosts' do
26+
let(:config) do
27+
super().merge(
28+
'hosts' => hosts
29+
)
30+
end
31+
let(:hosts) do
32+
fail NotImplementedError, 'spec or spec group must define `hosts`.'
33+
end
34+
35+
let(:client_stub) { double(:client).as_null_object }
36+
let(:logger_stub) { double(:logger).as_null_object }
37+
38+
before(:each) do
39+
allow(plugin).to receive(:logger).and_return(logger_stub)
40+
end
41+
42+
context 'with schema://hostname' do
43+
let(:hosts) { ['http://foo.local', 'http://bar.local'] }
44+
45+
it 'creates client with URIs that do not include a port' do
46+
expect(::Elasticsearch::Client).to receive(:new) do |options|
47+
expect(options).to include :hosts
48+
expect(options[:hosts]).to be_an Array
49+
expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http', port: nil))
50+
expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http', port: nil))
51+
end.and_return(client_stub)
52+
53+
plugin.register
54+
end
55+
end
56+
57+
context 'with `ssl => true`' do
58+
let(:config) { super().merge('ssl' => 'true') }
59+
context 'and one or more explicitly-http hosts' do
60+
let(:hosts) { ['https://foo.local', 'http://bar.local'] }
61+
62+
it 'raises an exception' do
63+
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
64+
end
65+
66+
it 'emits a helpful log message' do
67+
plugin.register rescue nil
68+
expect(plugin.logger).to have_received(:error).with(match(/force SSL/))
69+
end
70+
end
71+
72+
context 'and all explicitly-https hosts' do
73+
let(:hosts) { ['https://foo.local', 'https://bar.local'] }
74+
75+
it 'sets the schemas on all to https' do
76+
expect(::Elasticsearch::Client).to receive(:new) do |options|
77+
expect(options).to include :hosts
78+
expect(options[:hosts]).to be_an Array
79+
options[:hosts].each do |host|
80+
expect(host).to be_an URI
81+
expect(host.scheme).to eq 'https'
82+
end
83+
end.and_return(client_stub)
84+
85+
plugin.register
86+
end
87+
end
88+
89+
context 'and one or more schemaless hosts' do
90+
let(:hosts) { ['https://foo.local', 'bar.local'] }
91+
92+
it 'sets the schemas on all to https' do
93+
expect(::Elasticsearch::Client).to receive(:new) do |options|
94+
expect(options).to include :hosts
95+
expect(options[:hosts]).to be_an Array
96+
options[:hosts].each do |host|
97+
expect(host).to be_an URI
98+
expect(host.scheme).to eq 'https'
99+
end
100+
end.and_return(client_stub)
101+
102+
plugin.register
103+
end
104+
end
105+
106+
context 'with one or more ipv6 hostnames' do
107+
let(:hosts) { ['[::1]', '[::2]:9201', 'https://[::3]:9202', '::4'] }
108+
it 'defaults to the http protocol' do
109+
expect(::Elasticsearch::Client).to receive(:new) do |options|
110+
expect(options).to include :hosts
111+
expect(options[:hosts]).to be_an Array
112+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::1]', port: nil))
113+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::2]', port: 9201))
114+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::3]', port: 9202))
115+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: nil))
116+
end.and_return(client_stub)
117+
118+
plugin.register
119+
end
120+
end
121+
end
122+
123+
{
124+
'with `ssl => false' => {'ssl' => 'false'},
125+
'without `ssl` directive' => {}
126+
}.each do |context_string, config_override|
127+
context(context_string) do
128+
let(:config) { super().merge(config_override) }
129+
130+
context 'with a mix of http and https hosts' do
131+
let(:hosts) { ['https://foo.local', 'http://bar.local'] }
132+
it 'does not modify the protocol' do
133+
expect(::Elasticsearch::Client).to receive(:new) do |options|
134+
expect(options).to include :hosts
135+
expect(options[:hosts]).to be_an Array
136+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil))
137+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
138+
end.and_return(client_stub)
139+
140+
plugin.register
141+
end
142+
end
143+
144+
context 'with https-only hosts' do
145+
let(:hosts) { ['https://foo.local', 'https://bar.local'] }
146+
it 'does not modify the protocol' do
147+
expect(::Elasticsearch::Client).to receive(:new) do |options|
148+
expect(options).to include :hosts
149+
expect(options[:hosts]).to be_an Array
150+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil))
151+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'bar.local', port: nil))
152+
end.and_return(client_stub)
153+
154+
plugin.register
155+
end
156+
end
157+
158+
context 'with http-only hosts' do
159+
let(:hosts) { ['http://foo.local', 'http://bar.local'] }
160+
it 'does not modify the protocol' do
161+
expect(::Elasticsearch::Client).to receive(:new) do |options|
162+
expect(options).to include :hosts
163+
expect(options[:hosts]).to be_an Array
164+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil))
165+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
166+
end.and_return(client_stub)
167+
168+
plugin.register
169+
end
170+
end
171+
172+
context 'with one or more schemaless hosts' do
173+
let(:hosts) { ['foo.local', 'bar.local' ] }
174+
it 'defaults to the http protocol' do
175+
expect(::Elasticsearch::Client).to receive(:new) do |options|
176+
expect(options).to include :hosts
177+
expect(options[:hosts]).to be_an Array
178+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil))
179+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
180+
end.and_return(client_stub)
181+
182+
plugin.register
183+
end
184+
end
185+
186+
context 'with one or more square-bracketed ipv6 literals' do
187+
let(:hosts) { ['[::1]', '[::2]:9201', 'http://[::3]','https://[::4]:9202', '::5'] }
188+
it 'defaults to the http protocol' do
189+
expect(::Elasticsearch::Client).to receive(:new) do |options|
190+
expect(options).to include :hosts
191+
expect(options[:hosts]).to be_an Array
192+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::1]', port: nil))
193+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::2]', port: 9201))
194+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::3]', port: nil))
195+
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: 9202))
196+
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::5]', port: nil))
197+
end.and_return(client_stub)
198+
199+
plugin.register
200+
end
201+
end
202+
end
203+
end
18204
end
19205
end
20206

0 commit comments

Comments
 (0)