1
1
/*
2
- * Copyright 2002-2016 the original author or authors.
2
+ * Copyright 2002-2018 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
16
16
17
17
package org .springframework .integration .jdbc ;
18
18
19
- import java .util . ArrayList ;
19
+ import java .sql . PreparedStatement ;
20
20
import java .util .List ;
21
+ import java .util .function .Consumer ;
21
22
22
23
import javax .sql .DataSource ;
23
24
24
25
import org .springframework .integration .context .IntegrationObjectSupport ;
25
26
import org .springframework .integration .core .MessageSource ;
26
27
import org .springframework .jdbc .core .ColumnMapRowMapper ;
27
28
import org .springframework .jdbc .core .JdbcOperations ;
28
- import org .springframework .jdbc .core .ResultSetExtractor ;
29
+ import org .springframework .jdbc .core .JdbcTemplate ;
30
+ import org .springframework .jdbc .core .PreparedStatementCreator ;
31
+ import org .springframework .jdbc .core .PreparedStatementCreatorFactory ;
29
32
import org .springframework .jdbc .core .RowMapper ;
30
- import org .springframework .jdbc .core .RowMapperResultSetExtractor ;
31
33
import org .springframework .jdbc .core .namedparam .NamedParameterJdbcOperations ;
32
34
import org .springframework .jdbc .core .namedparam .NamedParameterJdbcTemplate ;
33
35
import org .springframework .jdbc .core .namedparam .SqlParameterSource ;
34
36
import org .springframework .messaging .Message ;
37
+ import org .springframework .util .Assert ;
35
38
36
39
/**
37
40
* A polling channel adapter that creates messages from the payload returned by
@@ -50,42 +53,55 @@ public class JdbcPollingChannelAdapter extends IntegrationObjectSupport implemen
50
53
51
54
private final String selectQuery ;
52
55
53
- private volatile RowMapper <?> rowMapper ;
56
+ private RowMapper <?> rowMapper ;
54
57
55
- private volatile SqlParameterSource sqlQueryParameterSource ;
58
+ private SqlParameterSource sqlQueryParameterSource ;
56
59
57
- private volatile boolean updatePerRow = false ;
60
+ private boolean updatePerRow = false ;
58
61
59
- private volatile String updateSql ;
62
+ private String updateSql ;
60
63
61
- private volatile SqlParameterSourceFactory sqlParameterSourceFactory =
62
- new ExpressionEvaluatingSqlParameterSourceFactory ();
64
+ private SqlParameterSourceFactory sqlParameterSourceFactory = new ExpressionEvaluatingSqlParameterSourceFactory ();
63
65
64
- private volatile boolean sqlParameterSourceFactorySet ;
66
+ private boolean sqlParameterSourceFactorySet ;
65
67
66
- private volatile int maxRowsPerPoll = 0 ;
68
+ private int maxRows = 0 ;
67
69
68
70
/**
69
71
* Constructor taking {@link DataSource} from which the DB Connection can be
70
72
* obtained and the select query to execute to retrieve new rows.
71
- *
72
73
* @param dataSource Must not be null
73
74
* @param selectQuery query to execute
74
75
*/
75
76
public JdbcPollingChannelAdapter (DataSource dataSource , String selectQuery ) {
76
- this .jdbcOperations = new NamedParameterJdbcTemplate (dataSource );
77
- this .selectQuery = selectQuery ;
77
+ this (new JdbcTemplate (dataSource ), selectQuery );
78
78
}
79
79
80
80
/**
81
81
* Constructor taking {@link JdbcOperations} instance to use for query
82
82
* execution and the select query to execute to retrieve new rows.
83
- *
84
83
* @param jdbcOperations instance to use for query execution
85
84
* @param selectQuery query to execute
86
85
*/
87
86
public JdbcPollingChannelAdapter (JdbcOperations jdbcOperations , String selectQuery ) {
88
- this .jdbcOperations = new NamedParameterJdbcTemplate (jdbcOperations );
87
+ Assert .hasText (selectQuery , "'selectQuery' must be specified." );
88
+ this .jdbcOperations = new NamedParameterJdbcTemplate (jdbcOperations ) {
89
+
90
+ @ Override
91
+ protected PreparedStatementCreator getPreparedStatementCreator (String sql ,
92
+ SqlParameterSource paramSource , Consumer <PreparedStatementCreatorFactory > customizer ) {
93
+
94
+ PreparedStatementCreator preparedStatementCreator =
95
+ super .getPreparedStatementCreator (sql , paramSource , customizer );
96
+
97
+ return con -> {
98
+ PreparedStatement preparedStatement = preparedStatementCreator .createPreparedStatement (con );
99
+ preparedStatement .setMaxRows (JdbcPollingChannelAdapter .this .maxRows );
100
+ return preparedStatement ;
101
+ };
102
+ }
103
+ };
104
+
89
105
this .selectQuery = selectQuery ;
90
106
}
91
107
@@ -108,7 +124,6 @@ public void setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory sqlPara
108
124
109
125
/**
110
126
* A source of parameters for the select query used for polling.
111
- *
112
127
* @param sqlQueryParameterSource the sql query parameter source to set
113
128
*/
114
129
public void setSelectSqlParameterSource (SqlParameterSource sqlQueryParameterSource ) {
@@ -119,22 +134,37 @@ public void setSelectSqlParameterSource(SqlParameterSource sqlQueryParameterSour
119
134
* The maximum number of rows to pull out of the query results per poll (if
120
135
* greater than zero, otherwise all rows will be packed into the outgoing
121
136
* message). Default is zero.
122
- *
123
137
* @param maxRows the max rows to set
138
+ * @deprecated since 5.1 in favor of {@link #setMaxRows(int)}
124
139
*/
140
+ @ Deprecated
125
141
public void setMaxRowsPerPoll (int maxRows ) {
126
- this .maxRowsPerPoll = maxRows ;
142
+ setMaxRows (maxRows );
143
+ }
144
+
145
+ /**
146
+ * The maximum number of rows to query. Default is zero - select all records.
147
+ * @param maxRows the max rows to set
148
+ * @since 5.1
149
+ */
150
+ public void setMaxRows (int maxRows ) {
151
+ this .maxRows = maxRows ;
127
152
}
128
153
129
154
@ Override
130
155
protected void onInit () throws Exception {
131
156
super .onInit ();
132
- if (!this .sqlParameterSourceFactorySet && this . getBeanFactory () != null ) {
157
+ if (!this .sqlParameterSourceFactorySet && getBeanFactory () != null ) {
133
158
((ExpressionEvaluatingSqlParameterSourceFactory ) this .sqlParameterSourceFactory )
134
- .setBeanFactory (this . getBeanFactory ());
159
+ .setBeanFactory (getBeanFactory ());
135
160
}
136
161
}
137
162
163
+ @ Override
164
+ public String getComponentType () {
165
+ return "jdbc:inbound-channel-adapter" ;
166
+ }
167
+
138
168
/**
139
169
* Execute the query. If a query result set contains one or more rows, the
140
170
* Message payload will contain either a List of Maps for each row or, if a
@@ -148,7 +178,9 @@ public Message<Object> receive() {
148
178
if (payload == null ) {
149
179
return null ;
150
180
}
151
- return this .getMessageBuilderFactory ().withPayload (payload ).build ();
181
+ return getMessageBuilderFactory ()
182
+ .withPayload (payload )
183
+ .build ();
152
184
}
153
185
154
186
/**
@@ -174,43 +206,20 @@ private Object poll() {
174
206
return payload ;
175
207
}
176
208
177
- private void executeUpdateQuery (Object obj ) {
178
- SqlParameterSource updateParameterSource = this .sqlParameterSourceFactory .createParameterSource (obj );
179
- this .jdbcOperations .update (this .updateSql , updateParameterSource );
180
- }
181
-
182
209
protected List <?> doPoll (SqlParameterSource sqlQueryParameterSource ) {
183
210
final RowMapper <?> rowMapper = this .rowMapper == null ? new ColumnMapRowMapper () : this .rowMapper ;
184
- ResultSetExtractor <List <Object >> resultSetExtractor ;
185
-
186
- if (this .maxRowsPerPoll > 0 ) {
187
- resultSetExtractor = rs -> {
188
- List <Object > results = new ArrayList <Object >(JdbcPollingChannelAdapter .this .maxRowsPerPoll );
189
- int rowNum = 0 ;
190
- while (rs .next () && rowNum < JdbcPollingChannelAdapter .this .maxRowsPerPoll ) {
191
- results .add (rowMapper .mapRow (rs , rowNum ++));
192
- }
193
- return results ;
194
- };
195
- }
196
- else {
197
- @ SuppressWarnings ("unchecked" )
198
- ResultSetExtractor <List <Object >> temp =
199
- new RowMapperResultSetExtractor <Object >((RowMapper <Object >) rowMapper );
200
- resultSetExtractor = temp ;
201
- }
202
211
203
212
if (sqlQueryParameterSource != null ) {
204
- return this .jdbcOperations .query (this .selectQuery , sqlQueryParameterSource , resultSetExtractor );
213
+ return this .jdbcOperations .query (this .selectQuery , sqlQueryParameterSource , rowMapper );
205
214
}
206
215
else {
207
- return this .jdbcOperations .getJdbcOperations (). query (this .selectQuery , resultSetExtractor );
216
+ return this .jdbcOperations .query (this .selectQuery , rowMapper );
208
217
}
209
218
}
210
219
211
- @ Override
212
- public String getComponentType () {
213
- return "jdbc:inbound-channel-adapter" ;
220
+ private void executeUpdateQuery ( Object obj ) {
221
+ SqlParameterSource updateParameterSource = this . sqlParameterSourceFactory . createParameterSource ( obj );
222
+ this . jdbcOperations . update ( this . updateSql , updateParameterSource ) ;
214
223
}
215
224
216
225
}
0 commit comments