@@ -1000,134 +1000,180 @@ impl CanSplitDoBetter {
10001000 }
10011001 }
10021002
1003- /// Optimize the order in which splits will get processed based on how it can skip the most
1004- /// splits.
1005- ///
1006- /// The leaf search code contains some logic that makes it possible to skip entire splits
1007- /// when we are confident they won't make it into top K.
1008- /// To make this optimization as potent as possible, we sort the splits so that the first splits
1009- /// are the most likely to fill our Top K.
1010- /// In the future, as split get more metadata per column, we may be able to do this more than
1011- /// just for timestamp and "unsorted" request.
1012- fn optimize_split_order ( & self , splits : & mut [ SplitIdAndFooterOffsets ] ) {
1013- match self {
1014- CanSplitDoBetter :: SplitIdHigher ( _) => {
1015- splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) )
1016- }
1017- CanSplitDoBetter :: SplitTimestampHigher ( _)
1018- | CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1019- splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) )
1020- }
1021- CanSplitDoBetter :: SplitTimestampLower ( _) => {
1022- splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) )
1023- }
1024- CanSplitDoBetter :: Uninformative => ( ) ,
1003+ fn to_splits_with_request (
1004+ splits : Vec < SplitIdAndFooterOffsets > ,
1005+ request : Arc < SearchRequest > ,
1006+ ) -> Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > {
1007+ // TODO: we maybe want here some deduplication + Cow logic
1008+ splits
1009+ . into_iter ( )
1010+ . map ( |split| ( split, ( * request) . clone ( ) ) )
1011+ . collect :: < Vec < _ > > ( )
1012+ }
1013+
1014+ /// Calculate the number of splits which are guaranteed to deliver enough documents.
1015+ fn get_min_required_splits (
1016+ splits : & [ SplitIdAndFooterOffsets ] ,
1017+ request : & SearchRequest ,
1018+ ) -> usize {
1019+ let num_requested_docs = request. start_offset + request. max_hits ;
1020+
1021+ splits
1022+ . into_iter ( )
1023+ . map ( |split| split. num_docs )
1024+ // computing the partial sum
1025+ . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1026+ * partial_sum += num_docs_in_split;
1027+ Some ( * partial_sum)
1028+ } )
1029+ . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1030+ . count ( )
1031+ + 1
1032+ }
1033+
1034+ fn optimize_split_id_higher (
1035+ & self ,
1036+ request : Arc < SearchRequest > ,
1037+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1038+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1039+ splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) ) ;
1040+
1041+ if !is_simple_all_query ( & request) {
1042+ // no optimization opportunity here.
1043+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1044+ }
1045+
1046+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1047+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1048+
1049+ // In this case there is no sort order, we order by split id.
1050+ // If the the first split has enough documents, we can convert the other queries to
1051+ // count only queries.
1052+ for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1053+ disable_search_request_hits ( request) ;
10251054 }
1055+
1056+ Ok ( split_with_req)
10261057 }
10271058
1028- /// This function tries to detect upfront which splits contain the top n hits and convert other
1029- /// split searches to count only searches. It also optimizes split order.
1030- ///
1031- /// Returns the search_requests with their split.
1032- fn optimize (
1059+ fn optimize_split_timestamp_higher (
10331060 & self ,
10341061 request : Arc < SearchRequest > ,
10351062 mut splits : Vec < SplitIdAndFooterOffsets > ,
10361063 ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1037- self . optimize_split_order ( & mut splits ) ;
1064+ splits . sort_unstable_by_key ( |split| std :: cmp :: Reverse ( split . timestamp_end ( ) ) ) ;
10381065
10391066 if !is_simple_all_query ( & request) {
10401067 // no optimization opportunity here.
1041- return Ok ( splits
1042- . into_iter ( )
1043- . map ( |split| ( split, ( * request) . clone ( ) ) )
1044- . collect :: < Vec < _ > > ( ) ) ;
1068+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
10451069 }
10461070
1047- let num_requested_docs = request. start_offset + request. max_hits ;
1071+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1072+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
10481073
1049- // Calculate the number of splits which are guaranteed to deliver enough documents.
1050- let min_required_splits = splits
1074+ // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1075+ //
1076+ // We have the number of splits we need to search to get enough docs, now we need to
1077+ // find the splits that don't overlap.
1078+ //
1079+ // Let's get the smallest timestamp_start of the first num_splits splits
1080+ let smallest_start_timestamp = split_with_req
10511081 . iter ( )
1052- . map ( |split| split. num_docs )
1053- // computing the partial sum
1054- . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1055- * partial_sum += num_docs_in_split;
1056- Some ( * partial_sum)
1057- } )
1058- . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1059- . count ( )
1060- + 1 ;
1082+ . take ( min_required_splits)
1083+ . map ( |( split, _) | split. timestamp_start ( ) )
1084+ . min ( )
1085+ // if min_required_splits is 0, we choose a value that disables all splits
1086+ . unwrap_or ( i64:: MAX ) ;
1087+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1088+ if split. timestamp_end ( ) < smallest_start_timestamp {
1089+ disable_search_request_hits ( request) ;
1090+ }
1091+ }
10611092
1062- // TODO: we maybe want here some deduplication + Cow logic
1063- let mut split_with_req = splits
1064- . into_iter ( )
1065- . map ( |split| ( split, ( * request) . clone ( ) ) )
1066- . collect :: < Vec < _ > > ( ) ;
1093+ Ok ( split_with_req)
1094+ }
1095+
1096+ fn optimize_split_timestamp_lower (
1097+ & self ,
1098+ request : Arc < SearchRequest > ,
1099+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1100+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1101+ splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) ) ;
1102+
1103+ if !is_simple_all_query ( & request) {
1104+ // no optimization opportunity here.
1105+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1106+ }
1107+
1108+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1109+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1110+
1111+ // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1112+ //
1113+ // If we know that some splits will deliver enough documents, we can convert the
1114+ // others to count only queries.
1115+ // Since we only have start and end ranges and don't know the distribution we make
1116+ // sure the splits dont' overlap, since the distribution of two
1117+ // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1118+ // queries.
1119+ // ```
1120+ // [. .] Split1 has enough docs, but last doc is not in top 2
1121+ // [.. .] Split2 first doc is in top2
1122+ // ```
1123+ // Let's get the biggest timestamp_end of the first num_splits splits
1124+ let biggest_end_timestamp = split_with_req
1125+ . iter ( )
1126+ . take ( min_required_splits)
1127+ . map ( |( split, _) | split. timestamp_end ( ) )
1128+ . max ( )
1129+ // if min_required_splits is 0, we choose a value that disables all splits
1130+ . unwrap_or ( i64:: MIN ) ;
1131+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1132+ if split. timestamp_start ( ) > biggest_end_timestamp {
1133+ disable_search_request_hits ( request) ;
1134+ }
1135+ }
1136+
1137+ Ok ( split_with_req)
1138+ }
1139+
1140+ fn optimize_find_trace_ids_aggregation (
1141+ & self ,
1142+ request : Arc < SearchRequest > ,
1143+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1144+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1145+ splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) ) ;
1146+
1147+ if !is_simple_all_query ( & request) {
1148+ // no optimization opportunity here.
1149+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1150+ }
10671151
1068- // reuse the detected sort order in split_filter
1069- // we want to detect cases where we can convert some split queries to count only queries
1152+ Ok ( Self :: to_splits_with_request ( splits, request) )
1153+ }
1154+
1155+ /// This function tries to detect upfront which splits contain the top n hits and convert other
1156+ /// split searches to count only searches. It also optimizes split order.
1157+ ///
1158+ /// Returns the search_requests with their split.
1159+ fn optimize (
1160+ & self ,
1161+ request : Arc < SearchRequest > ,
1162+ splits : Vec < SplitIdAndFooterOffsets > ,
1163+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
10701164 match self {
1071- CanSplitDoBetter :: SplitIdHigher ( _) => {
1072- // In this case there is no sort order, we order by split id.
1073- // If the the first split has enough documents, we can convert the other queries to
1074- // count only queries
1075- for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1076- disable_search_request_hits ( request) ;
1077- }
1165+ CanSplitDoBetter :: SplitIdHigher ( _) => self . optimize_split_id_higher ( request, splits) ,
1166+ CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1167+ self . optimize_split_timestamp_higher ( request, splits)
10781168 }
1079- CanSplitDoBetter :: Uninformative => { }
10801169 CanSplitDoBetter :: SplitTimestampLower ( _) => {
1081- // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1082- //
1083- // If we know that some splits will deliver enough documents, we can convert the
1084- // others to count only queries.
1085- // Since we only have start and end ranges and don't know the distribution we make
1086- // sure the splits dont' overlap, since the distribution of two
1087- // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1088- // queries.
1089- // ```
1090- // [. .] Split1 has enough docs, but last doc is not in top 2
1091- // [.. .] Split2 first doc is in top2
1092- // ```
1093- // Let's get the biggest timestamp_end of the first num_splits splits
1094- let biggest_end_timestamp = split_with_req
1095- . iter ( )
1096- . take ( min_required_splits)
1097- . map ( |( split, _) | split. timestamp_end ( ) )
1098- . max ( )
1099- // if min_required_splits is 0, we choose a value that disables all splits
1100- . unwrap_or ( i64:: MIN ) ;
1101- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1102- if split. timestamp_start ( ) > biggest_end_timestamp {
1103- disable_search_request_hits ( request) ;
1104- }
1105- }
1170+ self . optimize_split_timestamp_lower ( request, splits)
11061171 }
1107- CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1108- // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1109- //
1110- // We have the number of splits we need to search to get enough docs, now we need to
1111- // find the splits that don't overlap.
1112- //
1113- // Let's get the smallest timestamp_start of the first num_splits splits
1114- let smallest_start_timestamp = split_with_req
1115- . iter ( )
1116- . take ( min_required_splits)
1117- . map ( |( split, _) | split. timestamp_start ( ) )
1118- . min ( )
1119- // if min_required_splits is 0, we choose a value that disables all splits
1120- . unwrap_or ( i64:: MAX ) ;
1121- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1122- if split. timestamp_end ( ) < smallest_start_timestamp {
1123- disable_search_request_hits ( request) ;
1124- }
1125- }
1172+ CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1173+ self . optimize_find_trace_ids_aggregation ( request, splits)
11261174 }
1127- CanSplitDoBetter :: FindTraceIdsAggregation ( _ ) => { }
1175+ CanSplitDoBetter :: Uninformative => Ok ( Self :: to_splits_with_request ( splits , request ) ) ,
11281176 }
1129-
1130- Ok ( split_with_req)
11311177 }
11321178
11331179 /// Returns whether the given split can possibly give documents better than the one already
0 commit comments