@@ -179,8 +179,8 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
179
179
return streamCtx .Err () != nil
180
180
}
181
181
go func () {
182
- err = coord .StartStreaming (streamCtx , canStopStreaming )
183
- suite .Require ().Equal (context .Canceled , err )
182
+ streamErr : = coord .StartStreaming (streamCtx , canStopStreaming )
183
+ suite .Require ().Equal (context .Canceled , streamErr )
184
184
}()
185
185
186
186
// Give streamer some time to start
@@ -199,6 +199,23 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
199
199
}
200
200
201
201
fmt .Printf ("Time taken: %s\n " , time .Since (startAt ))
202
+
203
+ result , err := suite .db .Exec (`SELECT * FROM (
204
+ SELECT t1.id,
205
+ CRC32(CONCAT_WS(';',t1.id,t1.name))
206
+ AS checksum1,
207
+ CRC32(CONCAT_WS(';',t2.id,t2.name))
208
+ AS checksum2
209
+ FROM test.gh_ost_test t1
210
+ LEFT JOIN test._gh_ost_test_gho t2
211
+ ON t1.id = t2.id
212
+ ) AS checksums
213
+ WHERE checksums.checksum1 != checksums.checksum2` )
214
+ suite .Require ().NoError (err )
215
+
216
+ count , err := result .RowsAffected ()
217
+ suite .Require ().NoError (err )
218
+ suite .Require ().Zero (count )
202
219
}
203
220
204
221
func TestCoordinator (t * testing.T ) {
0 commit comments