Skip to content

Add some infrastructure for Observation #3879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,12 @@ project('spring-integration-core') {
testImplementation "org.aspectj:aspectjweaver:$aspectjVersion"
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation ('io.micrometer:micrometer-tracing-integration-test') {
exclude group: 'io.opentelemetry'
exclude group: 'com.wavefront'
exclude group: 'io.micrometer', module: 'micrometer-tracing-bridge-otel'

}
}

dokkaHtmlPartial {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,8 @@
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.util.Assert;

import io.micrometer.observation.ObservationRegistry;

/**
* {@code @Configuration} class that registers a {@link IntegrationManagementConfigurer} bean.
*
Expand Down Expand Up @@ -64,12 +66,16 @@ public void setImportMetadata(AnnotationMetadata importMetadata) {

@Bean(name = IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public IntegrationManagementConfigurer managementConfigurer(ObjectProvider<MetricsCaptor> metricsCaptorProvider) {
public IntegrationManagementConfigurer managementConfigurer(
ObjectProvider<MetricsCaptor> metricsCaptorProvider,
ObjectProvider<ObservationRegistry> observationRegistryProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that micrometer-observation is a mandatory dependency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what we have discussed with you a couple weeks ago.
The conclusion was like: since a micrometer-observation is tiny API jar there is no reason in extra facade layer complexity on our side.
The bean by itself is optional though, so we just don't have any observation by default either way.


IntegrationManagementConfigurer configurer = new IntegrationManagementConfigurer();
configurer.setDefaultLoggingEnabled(
Boolean.parseBoolean(this.environment.resolvePlaceholders(
(String) this.attributes.get("defaultLoggingEnabled"))));
configurer.setMetricsCaptorProvider(metricsCaptorProvider);
configurer.setObservationRegistry(observationRegistryProvider);
return configurer;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,10 +38,12 @@
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;

import io.micrometer.observation.ObservationRegistry;


/**
* Configures beans that implement {@link IntegrationManagement}.
* Configures counts, stats, logging for all (or selected) components.
* Configures logging, {@link MetricsCaptor} and {@link ObservationRegistry} for all (or selected) components.
*
* @author Gary Russell
* @author Artem Bilan
Expand Down Expand Up @@ -74,6 +76,10 @@ public class IntegrationManagementConfigurer

private ObjectProvider<MetricsCaptor> metricsCaptorProvider;

private ObservationRegistry observationRegistry;

private ObjectProvider<ObservationRegistry> observationRegistryProvider;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
Expand All @@ -95,8 +101,8 @@ public void setBeanName(String name) {
* {@link org.apache.commons.logging.Log#isDebugEnabled()} can be quite expensive
* and account for an inordinate amount of CPU time.
* <p>
* Set this to false to disable logging by default in all framework components that implement
* {@link IntegrationManagement} (channels, message handlers etc). This turns off logging such as
* Set this to 'false' to disable logging by default in all framework components that implement
* {@link IntegrationManagement} (channels, message handlers etc.) This turns off logging such as
* "PreSend on channel", "Received message" etc.
* <p>
* After the context is initialized, individual components can have their setting changed by invoking
Expand All @@ -115,14 +121,21 @@ void setMetricsCaptorProvider(ObjectProvider<MetricsCaptor> metricsCaptorProvide
this.metricsCaptorProvider = metricsCaptorProvider;
}

@Nullable
MetricsCaptor obtainMetricsCaptor() {
if (this.metricsCaptor == null && this.metricsCaptorProvider != null) {
this.metricsCaptor = this.metricsCaptorProvider.getIfUnique();
}
return this.metricsCaptor;
/**
* Set an {@link ObservationRegistry} to populate to the {@link IntegrationManagement} components
* in the application context.
* @param observationRegistry the {@link ObservationRegistry} to use.
* @since 6.0
*/
public void setObservationRegistry(@Nullable ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

void setObservationRegistry(ObjectProvider<ObservationRegistry> observationRegistryProvider) {
this.observationRegistryProvider = observationRegistryProvider;
}


@Override
public void afterSingletonsInstantiated() {
Assert.state(this.applicationContext != null, "'applicationContext' must not be null");
Expand All @@ -133,15 +146,29 @@ public void afterSingletonsInstantiated() {
registerComponentGauges();
}

for (IntegrationManagement integrationManagement :
this.applicationContext.getBeansOfType(IntegrationManagement.class).values()) {
setupObservationRegistry();

enhanceIntegrationManagement(integrationManagement);
}
this.applicationContext.getBeansOfType(IntegrationManagement.class).values()
.forEach(this::enhanceIntegrationManagement);

this.singletonsInstantiated = true;
}

@Nullable
private MetricsCaptor obtainMetricsCaptor() {
if (this.metricsCaptor == null && this.metricsCaptorProvider != null) {
this.metricsCaptor = this.metricsCaptorProvider.getIfUnique();
}
return this.metricsCaptor;
}

@Nullable
private void setupObservationRegistry() {
if (this.observationRegistry == null && this.observationRegistryProvider != null) {
this.observationRegistry = this.observationRegistryProvider.getIfUnique();
}
}

private void registerComponentGauges() {
this.gauges.add(
this.metricsCaptor.gaugeBuilder("spring.integration.channels", this,
Expand Down Expand Up @@ -169,17 +196,21 @@ private void enhanceIntegrationManagement(IntegrationManagement integrationManag
if (this.metricsCaptor != null) {
integrationManagement.registerMetricsCaptor(this.metricsCaptor);
}
if (this.observationRegistry != null) {
integrationManagement.registerObservationRegistry(this.observationRegistry);
}
}

@Override
public Object postProcessAfterInitialization(Object bean, String name) throws BeansException {
if (this.singletonsInstantiated && bean instanceof IntegrationManagement) {
enhanceIntegrationManagement((IntegrationManagement) bean);
if (this.singletonsInstantiated && bean instanceof IntegrationManagement integrationManagement) {
enhanceIntegrationManagement(integrationManagement);
}
return bean;
}

@Override public void onApplicationEvent(ContextClosedEvent event) {
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (event.getApplicationContext().equals(this.applicationContext)) {
this.gauges.forEach(MeterFacade::remove);
this.gauges.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.metrics.SampleFacade;
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.IntegrationObservation;
import org.springframework.integration.support.management.observation.MessageReceiverContext;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;

import io.micrometer.observation.ObservationRegistry;
import reactor.core.CoreSubscriber;

/**
Expand All @@ -37,21 +43,49 @@
public abstract class AbstractMessageHandler extends MessageHandlerSupport
implements MessageHandler, CoreSubscriber<Message<?>> {

@Nullable
private MessageReceiverObservationConvention observationConvention;

/**
* Set a custom {@link MessageReceiverObservationConvention} for {@link IntegrationObservation#HANDLER}.
* Ignored if an {@link ObservationRegistry} is not configured for this component.
* @param observationConvention the {@link MessageReceiverObservationConvention} to use.
* @since 6.0
*/
public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override // NOSONAR
public void handleMessage(Message<?> message) {
Assert.notNull(message, "Message must not be null");
if (isLoggingEnabled() && this.logger.isDebugEnabled()) {
this.logger.debug(this + " received message: " + message);
if (isLoggingEnabled()) {
this.logger.debug(() -> this + " received message: " + message);
}
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
handleWithMetrics(message, metricsCaptor);
ObservationRegistry observationRegistry = getObservationRegistry();
if (observationRegistry != null) {
handleWithObservation(message, observationRegistry);
}
else {
doHandleMessage(message);
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
handleWithMetrics(message, metricsCaptor);
}
else {
doHandleMessage(message);
}
}
}

private void handleWithObservation(Message<?> message, ObservationRegistry observationRegistry) {
IntegrationObservation.HANDLER.observation(
this.observationConvention,
DefaultMessageReceiverObservationConvention.INSTANCE,
new MessageReceiverContext(message, getComponentName()),
observationRegistry)
.observe(() -> doHandleMessage(message));
}

private void handleWithMetrics(Message<?> message, MetricsCaptor metricsCaptor) {
SampleFacade sample = metricsCaptor.start();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,8 @@
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.metrics.TimerFacade;

import io.micrometer.observation.ObservationRegistry;

/**
* Base class for Message handling components that provides basic validation and error
* handling capabilities. Asserts that the incoming Message is not null and that it does
Expand Down Expand Up @@ -61,6 +63,8 @@ public abstract class MessageHandlerSupport extends IntegrationObjectSupport

private MetricsCaptor metricsCaptor;

private ObservationRegistry observationRegistry;

private int order = Ordered.LOWEST_PRECEDENCE;

private String managedName;
Expand Down Expand Up @@ -89,6 +93,15 @@ protected MetricsCaptor getMetricsCaptor() {
return this.metricsCaptor;
}

@Override
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

protected ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

@Override
public void setOrder(int order) {
this.order = order;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,10 +22,14 @@
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.lang.Nullable;

import io.micrometer.observation.ObservationRegistry;

/**
* Base interface for Integration managed components.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.2
*
*/
Expand All @@ -39,7 +43,7 @@ public interface IntegrationManagement extends NamedComponent, DisposableBean {

/**
* Enable logging or not.
* @param enabled dalse to disable.
* @param enabled false to disable.
*/
@ManagedAttribute(description = "Use to disable debug logging during normal message flow")
default void setLoggingEnabled(boolean enabled) {
Expand Down Expand Up @@ -80,13 +84,28 @@ default ManagementOverrides getOverrides() {

/**
* Inject a {@link MetricsCaptor}.
* Ignored if {@link ObservationRegistry} is provided.
* @param captor the captor.
* @since 5.0.4
* @see #registerObservationRegistry(ObservationRegistry)
*/
default void registerMetricsCaptor(MetricsCaptor captor) {
// no op
}

/**
* Inject an {@link ObservationRegistry}.
* If provided, the {@link MetricsCaptor} is ignored.
* The meters capturing has to be configured as an {@link io.micrometer.observation.ObservationHandler}
* on the provided {@link ObservationRegistry}.
* @param observationRegistry the {@link ObservationRegistry} to expose observations from the component.
* @since 6.0
* @see #registerMetricsCaptor(MetricsCaptor)
*/
default void registerObservationRegistry(ObservationRegistry observationRegistry) {
// no op
}

@Override
default void destroy() {
// no op
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.management.observation;

import io.micrometer.common.KeyValues;

/**
* A default {@link MessageReceiverObservationConvention} implementation.
* Provides low cardinalities as a {@link IntegrationObservation.HandlerTags} values.
*
* @author Artem Bilan
*
* @since 6.0
*/
public class DefaultMessageReceiverObservationConvention implements MessageReceiverObservationConvention {

/**
* A shared singleton instance for {@link DefaultMessageReceiverObservationConvention}.
*/
public static final DefaultMessageReceiverObservationConvention INSTANCE =
new DefaultMessageReceiverObservationConvention();

@Override
public KeyValues getLowCardinalityKeyValues(MessageReceiverContext context) {
return KeyValues.of(
IntegrationObservation.HandlerTags.COMPONENT_NAME.withValue(context.getHandlerName()),
IntegrationObservation.HandlerTags.COMPONENT_TYPE.withValue("handler"));
}

}
Loading