Skip to content

Enable $out aggregation to push to time series collection #4995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import org.bson.Document;
import org.bson.conversions.Bson;
import org.jspecify.annotations.Nullable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.mongodb.core.CollectionOptions.TimeSeriesOptions;
import org.springframework.data.mongodb.core.aggregation.AddFieldsOperation.AddFieldsOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.CountOperation.CountOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.FacetOperation.FacetOperationBuilder;
Expand All @@ -37,6 +39,7 @@
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.SerializationUtils;
import org.springframework.data.mongodb.core.timeseries.Granularity;
import org.springframework.util.Assert;

/**
Expand All @@ -53,6 +56,7 @@
* @author Gustavo de Geus
* @author Jérôme Guyon
* @author Sangyong Choi
* @author Hyunsang Han
* @since 1.3
*/
public class Aggregation {
Expand Down Expand Up @@ -586,6 +590,46 @@ public static OutOperation out(String outCollectionName) {
return new OutOperation(outCollectionName);
}

/**
* Creates a new {@link OutOperation} for time series collections using the given collection name and time series
* options.
*
* @param outCollectionName collection name to export aggregation results. Must not be {@literal null}.
* @param timeSeriesOptions must not be {@literal null}.
* @return new instance of {@link OutOperation}.
* @since 5.0
*/
public static OutOperation out(String outCollectionName, TimeSeriesOptions timeSeriesOptions) {
return new OutOperation(outCollectionName).timeSeries(timeSeriesOptions);
}

/**
* Creates a new {@link OutOperation} for time series collections using the given collection name and time field.
*
* @param outCollectionName collection name to export aggregation results. Must not be {@literal null}.
* @param timeField must not be {@literal null} or empty.
* @return new instance of {@link OutOperation}.
* @since 5.0
*/
public static OutOperation out(String outCollectionName, String timeField) {
return new OutOperation(outCollectionName).timeSeries(timeField);
}

/**
* Creates a new {@link OutOperation} for time series collections using the given collection name, time field, meta
* field, and granularity.
*
* @param outCollectionName collection name to export aggregation results. Must not be {@literal null}.
* @param timeField must not be {@literal null} or empty.
* @param metaField can be {@literal null}.
* @param granularity can be {@literal null}.
* @return new instance of {@link OutOperation}.
* @since 5.0
*/
public static OutOperation out(String outCollectionName, String timeField, @Nullable String metaField, @Nullable Granularity granularity) {
return new OutOperation(outCollectionName).timeSeries(timeField, metaField, granularity);
}

/**
* Creates a new {@link BucketOperation} given {@literal groupByField}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.bson.Document;
import org.jspecify.annotations.Nullable;

import org.springframework.data.mongodb.core.CollectionOptions.TimeSeriesOptions;
import org.springframework.data.mongodb.core.timeseries.Granularity;
import org.springframework.lang.Contract;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
Expand All @@ -30,32 +32,36 @@
*
* @author Nikolay Bogdanov
* @author Christoph Strobl
* @author Hyunsang Han
* @see <a href="https://docs.mongodb.com/manual/reference/operator/aggregation/out/">MongoDB Aggregation Framework:
* $out</a>
*/
public class OutOperation implements AggregationOperation {

private final @Nullable String databaseName;
private final String collectionName;
private final @Nullable TimeSeriesOptions timeSeriesOptions;

/**
* @param outCollectionName Collection name to export the results. Must not be {@literal null}.
*/
public OutOperation(String outCollectionName) {
this(null, outCollectionName);
this(null, outCollectionName, null);
}

/**
* @param databaseName Optional database name the target collection is located in. Can be {@literal null}.
* @param collectionName Collection name to export the results. Must not be {@literal null}. Can be {@literal null}.
* @since 2.2
* @param timeSeriesOptions Optional time series options for creating a time series collection. Can be {@literal null}.
* @since 5.0
*/
private OutOperation(@Nullable String databaseName, String collectionName) {
private OutOperation(@Nullable String databaseName, String collectionName, @Nullable TimeSeriesOptions timeSeriesOptions) {

Assert.notNull(collectionName, "Collection name must not be null");

this.databaseName = databaseName;
this.collectionName = collectionName;
this.timeSeriesOptions = timeSeriesOptions;
}

/**
Expand All @@ -68,17 +74,81 @@ private OutOperation(@Nullable String databaseName, String collectionName) {
*/
@Contract("_ -> new")
public OutOperation in(@Nullable String database) {
return new OutOperation(database, collectionName);
return new OutOperation(database, collectionName, timeSeriesOptions);
}

/**
* Set the time series options for creating a time series collection.
*
* @param timeSeriesOptions must not be {@literal null}.
* @return new instance of {@link OutOperation}.
* @since 5.0
*/
@Contract("_ -> new")
public OutOperation timeSeries(TimeSeriesOptions timeSeriesOptions) {

Assert.notNull(timeSeriesOptions, "TimeSeriesOptions must not be null");
return new OutOperation(databaseName, collectionName, timeSeriesOptions);
}

/**
* Set the time series options for creating a time series collection with only the time field.
*
* @param timeField must not be {@literal null} or empty.
* @return new instance of {@link OutOperation}.
* @since 5.0
*/
@Contract("_ -> new")
public OutOperation timeSeries(String timeField) {

Assert.hasText(timeField, "TimeField must not be null or empty");
return timeSeries(TimeSeriesOptions.timeSeries(timeField));
}

/**
* Set the time series options for creating a time series collection with time field, meta field, and granularity.
*
* @param timeField must not be {@literal null} or empty.
* @param metaField can be {@literal null}.
* @param granularity can be {@literal null}.
* @return new instance of {@link OutOperation}.
* @since 5.0
*/
@Contract("_, _, _ -> new")
public OutOperation timeSeries(String timeField, @Nullable String metaField, @Nullable Granularity granularity) {

Assert.hasText(timeField, "TimeField must not be null or empty");
return timeSeries(TimeSeriesOptions.timeSeries(timeField).metaField(metaField).granularity(granularity));
}

@Override
public Document toDocument(AggregationOperationContext context) {

if (!StringUtils.hasText(databaseName)) {
if (!StringUtils.hasText(databaseName) && timeSeriesOptions == null) {
return new Document(getOperator(), collectionName);
}

return new Document(getOperator(), new Document("db", databaseName).append("coll", collectionName));
Document outDocument = new Document("coll", collectionName);

if (StringUtils.hasText(databaseName)) {
outDocument.put("db", databaseName);
}

if (timeSeriesOptions != null) {
Document timeSeriesDoc = new Document("timeField", timeSeriesOptions.getTimeField());

if (StringUtils.hasText(timeSeriesOptions.getMetaField())) {
timeSeriesDoc.put("metaField", timeSeriesOptions.getMetaField());
}

if (timeSeriesOptions.getGranularity() != null && timeSeriesOptions.getGranularity() != Granularity.DEFAULT) {
timeSeriesDoc.put("granularity", timeSeriesOptions.getGranularity().name().toLowerCase());
}

outDocument.put("timeseries", timeSeriesDoc);
}

return new Document(getOperator(), outDocument);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.bson.Document;
import org.junit.jupiter.api.Test;
import org.springframework.data.mongodb.core.CollectionOptions.TimeSeriesOptions;
import org.springframework.data.mongodb.core.timeseries.Granularity;

/**
* Unit tests for {@link OutOperation}.
*
* @author Nikolay Bogdanov
* @author Christoph Strobl
* @author Mark Paluch
* @author Hyunsang Han
*/
class OutOperationUnitTest {

Expand All @@ -48,4 +51,99 @@ void shouldRenderDocument() {
.containsEntry("$out.db", "database-2");
}

@Test // GH-4985
void shouldRenderTimeSeriesCollectionWithTimeFieldOnly() {

Document result = out("timeseries-col").timeSeries("timestamp").toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).doesNotContainKey("$out.timeseries.metaField");
assertThat(result).doesNotContainKey("$out.timeseries.granularity");
}

@Test // GH-4985
void shouldRenderTimeSeriesCollectionWithAllOptions() {

Document result = out("timeseries-col").timeSeries("timestamp", "metadata", Granularity.SECONDS)
.toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).containsEntry("$out.timeseries.metaField", "metadata");
assertThat(result).containsEntry("$out.timeseries.granularity", "seconds");
}

@Test // GH-4985
void shouldRenderTimeSeriesCollectionWithDatabaseAndAllOptions() {

Document result = out("timeseries-col").in("test-db").timeSeries("timestamp", "metadata", Granularity.MINUTES)
.toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.db", "test-db");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).containsEntry("$out.timeseries.metaField", "metadata");
assertThat(result).containsEntry("$out.timeseries.granularity", "minutes");
}

@Test // GH-4985
void shouldRenderTimeSeriesCollectionWithTimeSeriesOptions() {

TimeSeriesOptions options = TimeSeriesOptions.timeSeries("timestamp").metaField("metadata").granularity(Granularity.HOURS);
Document result = out("timeseries-col").timeSeries(options).toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).containsEntry("$out.timeseries.metaField", "metadata");
assertThat(result).containsEntry("$out.timeseries.granularity", "hours");
}

@Test // GH-4985
void shouldRenderTimeSeriesCollectionWithPartialOptions() {

Document result = out("timeseries-col").timeSeries("timestamp", "metadata", null)
.toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).containsEntry("$out.timeseries.metaField", "metadata");
assertThat(result).doesNotContainKey("$out.timeseries.granularity");
}

@Test // GH-4985
void outWithTimeSeriesOptionsShouldRenderCorrectly() {

TimeSeriesOptions options = TimeSeriesOptions.timeSeries("timestamp").metaField("metadata").granularity(Granularity.SECONDS);
Document result = Aggregation.out("timeseries-col", options).toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).containsEntry("$out.timeseries.metaField", "metadata");
assertThat(result).containsEntry("$out.timeseries.granularity", "seconds");
}

@Test // GH-4985
void outWithTimeFieldOnlyShouldRenderCorrectly() {

Document result = Aggregation.out("timeseries-col", "timestamp").toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).doesNotContainKey("$out.timeseries.metaField");
assertThat(result).doesNotContainKey("$out.timeseries.granularity");
}

@Test // GH-4985
void outWithAllOptionsShouldRenderCorrectly() {

Document result = Aggregation.out("timeseries-col", "timestamp", "metadata", Granularity.MINUTES)
.toDocument(Aggregation.DEFAULT_CONTEXT);

assertThat(result).containsEntry("$out.coll", "timeseries-col");
assertThat(result).containsEntry("$out.timeseries.timeField", "timestamp");
assertThat(result).containsEntry("$out.timeseries.metaField", "metadata");
assertThat(result).containsEntry("$out.timeseries.granularity", "minutes");
}

}