@@ -42,7 +42,7 @@ use tantivy::index::SegmentId;
42
42
use tantivy:: tokenizer:: TokenizerManager ;
43
43
use tantivy:: { DateTime , Directory , Index , IndexMeta , IndexWriter , SegmentReader } ;
44
44
use tokio:: runtime:: Handle ;
45
- use tracing:: { debug, info, instrument, warn} ;
45
+ use tracing:: { debug, error , info, instrument, warn} ;
46
46
47
47
use crate :: actors:: Packager ;
48
48
use crate :: controlled_directory:: ControlledDirectory ;
@@ -90,16 +90,35 @@ impl Handler<MergeScratch> for MergeExecutor {
90
90
let start = Instant :: now ( ) ;
91
91
let merge_task = merge_scratch. merge_task ;
92
92
let indexed_split_opt: Option < IndexedSplit > = match merge_task. operation_type {
93
- MergeOperationType :: Merge => Some (
94
- self . process_merge (
95
- merge_task. merge_split_id . clone ( ) ,
96
- merge_task. splits . clone ( ) ,
97
- merge_scratch. tantivy_dirs ,
98
- merge_scratch. merge_scratch_directory ,
99
- ctx,
100
- )
101
- . await ?,
102
- ) ,
93
+ MergeOperationType :: Merge => {
94
+ let merge_res = self
95
+ . process_merge (
96
+ merge_task. merge_split_id . clone ( ) ,
97
+ merge_task. splits . clone ( ) ,
98
+ merge_scratch. tantivy_dirs ,
99
+ merge_scratch. merge_scratch_directory ,
100
+ ctx,
101
+ )
102
+ . await ;
103
+ match merge_res {
104
+ Ok ( indexed_split) => Some ( indexed_split) ,
105
+ Err ( err) => {
106
+ // A failure in a merge is a bit special.
107
+ //
108
+ // Instead of failing the pipeline, we just log it.
109
+ // The idea is to limit the risk associated with a potential split of death.
110
+ //
111
+ // Such a split is now not tracked by the merge planner and won't undergo a
112
+ // merge until the merge pipeline is restarted.
113
+ //
114
+ // With a merge policy that marks splits as mature after a day or so, this
115
+ // limits the noise associated to those failed
116
+ // merges.
117
+ error ! ( task=?merge_task, err=?err, "failed to merge splits" ) ;
118
+ return Ok ( ( ) ) ;
119
+ }
120
+ }
121
+ }
103
122
MergeOperationType :: DeleteAndMerge => {
104
123
assert_eq ! (
105
124
merge_task. splits. len( ) ,
@@ -540,7 +559,7 @@ impl MergeExecutor {
540
559
541
560
debug ! ( segment_ids=?segment_ids, "merging-segments" ) ;
542
561
// TODO it would be nice if tantivy could let us run the merge in the current thread.
543
- index_writer. merge ( & segment_ids) . wait ( ) ?;
562
+ index_writer. merge ( & segment_ids) . await ?;
544
563
545
564
Ok ( output_directory)
546
565
}
0 commit comments