@@ -1000,134 +1000,176 @@ impl CanSplitDoBetter {
10001000 }
10011001 }
10021002
1003- /// Optimize the order in which splits will get processed based on how it can skip the most
1004- /// splits.
1005- ///
1006- /// The leaf search code contains some logic that makes it possible to skip entire splits
1007- /// when we are confident they won't make it into top K.
1008- /// To make this optimization as potent as possible, we sort the splits so that the first splits
1009- /// are the most likely to fill our Top K.
1010- /// In the future, as split get more metadata per column, we may be able to do this more than
1011- /// just for timestamp and "unsorted" request.
1012- fn optimize_split_order ( & self , splits : & mut [ SplitIdAndFooterOffsets ] ) {
1013- match self {
1014- CanSplitDoBetter :: SplitIdHigher ( _) => {
1015- splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) )
1016- }
1017- CanSplitDoBetter :: SplitTimestampHigher ( _)
1018- | CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1019- splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) )
1020- }
1021- CanSplitDoBetter :: SplitTimestampLower ( _) => {
1022- splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) )
1023- }
1024- CanSplitDoBetter :: Uninformative => ( ) ,
1003+ fn to_splits_with_request (
1004+ splits : Vec < SplitIdAndFooterOffsets > ,
1005+ request : Arc < SearchRequest > ,
1006+ ) -> Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > {
1007+ // TODO: we maybe want here some deduplication + Cow logic
1008+ splits
1009+ . into_iter ( )
1010+ . map ( |split| ( split, ( * request) . clone ( ) ) )
1011+ . collect :: < Vec < _ > > ( )
1012+ }
1013+
1014+ /// Calculate the number of splits which are guaranteed to deliver enough documents.
1015+ fn get_min_required_splits (
1016+ splits : & [ SplitIdAndFooterOffsets ] ,
1017+ request : & SearchRequest ,
1018+ ) -> usize {
1019+ let num_requested_docs = request. start_offset + request. max_hits ;
1020+
1021+ splits
1022+ . into_iter ( )
1023+ . map ( |split| split. num_docs )
1024+ // computing the partial sum
1025+ . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1026+ * partial_sum += num_docs_in_split;
1027+ Some ( * partial_sum)
1028+ } )
1029+ . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1030+ . count ( )
1031+ + 1
1032+ }
1033+
1034+ fn optimize_split_id_higher (
1035+ request : Arc < SearchRequest > ,
1036+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1037+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1038+ splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) ) ;
1039+
1040+ if !is_simple_all_query ( & request) {
1041+ // no optimization opportunity here.
1042+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
10251043 }
1044+
1045+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1046+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1047+
1048+ // In this case there is no sort order, we order by split id.
1049+ // If the the first split has enough documents, we can convert the other queries to
1050+ // count only queries.
1051+ for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1052+ disable_search_request_hits ( request) ;
1053+ }
1054+
1055+ Ok ( split_with_req)
10261056 }
10271057
1028- /// This function tries to detect upfront which splits contain the top n hits and convert other
1029- /// split searches to count only searches. It also optimizes split order.
1030- ///
1031- /// Returns the search_requests with their split.
1032- fn optimize (
1033- & self ,
1058+ fn optimize_split_timestamp_higher (
10341059 request : Arc < SearchRequest > ,
10351060 mut splits : Vec < SplitIdAndFooterOffsets > ,
10361061 ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1037- self . optimize_split_order ( & mut splits ) ;
1062+ splits . sort_unstable_by_key ( |split| std :: cmp :: Reverse ( split . timestamp_end ( ) ) ) ;
10381063
10391064 if !is_simple_all_query ( & request) {
10401065 // no optimization opportunity here.
1041- return Ok ( splits
1042- . into_iter ( )
1043- . map ( |split| ( split, ( * request) . clone ( ) ) )
1044- . collect :: < Vec < _ > > ( ) ) ;
1066+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
10451067 }
10461068
1047- let num_requested_docs = request. start_offset + request. max_hits ;
1069+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1070+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
10481071
1049- // Calculate the number of splits which are guaranteed to deliver enough documents.
1050- let min_required_splits = splits
1072+ // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1073+ //
1074+ // We have the number of splits we need to search to get enough docs, now we need to
1075+ // find the splits that don't overlap.
1076+ //
1077+ // Let's get the smallest timestamp_start of the first num_splits splits
1078+ let smallest_start_timestamp = split_with_req
10511079 . iter ( )
1052- . map ( |split| split. num_docs )
1053- // computing the partial sum
1054- . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1055- * partial_sum += num_docs_in_split;
1056- Some ( * partial_sum)
1057- } )
1058- . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1059- . count ( )
1060- + 1 ;
1080+ . take ( min_required_splits)
1081+ . map ( |( split, _) | split. timestamp_start ( ) )
1082+ . min ( )
1083+ // if min_required_splits is 0, we choose a value that disables all splits
1084+ . unwrap_or ( i64:: MAX ) ;
1085+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1086+ if split. timestamp_end ( ) < smallest_start_timestamp {
1087+ disable_search_request_hits ( request) ;
1088+ }
1089+ }
10611090
1062- // TODO: we maybe want here some deduplication + Cow logic
1063- let mut split_with_req = splits
1064- . into_iter ( )
1065- . map ( |split| ( split, ( * request) . clone ( ) ) )
1066- . collect :: < Vec < _ > > ( ) ;
1091+ Ok ( split_with_req)
1092+ }
1093+
1094+ fn optimize_split_timestamp_lower (
1095+ request : Arc < SearchRequest > ,
1096+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1097+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1098+ splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) ) ;
10671099
1068- // reuse the detected sort order in split_filter
1069- // we want to detect cases where we can convert some split queries to count only queries
1100+ if !is_simple_all_query ( & request) {
1101+ // no optimization opportunity here.
1102+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1103+ }
1104+
1105+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1106+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1107+
1108+ // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1109+ //
1110+ // If we know that some splits will deliver enough documents, we can convert the
1111+ // others to count only queries.
1112+ // Since we only have start and end ranges and don't know the distribution we make
1113+ // sure the splits dont' overlap, since the distribution of two
1114+ // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1115+ // queries.
1116+ // ```
1117+ // [. .] Split1 has enough docs, but last doc is not in top 2
1118+ // [.. .] Split2 first doc is in top2
1119+ // ```
1120+ // Let's get the biggest timestamp_end of the first num_splits splits
1121+ let biggest_end_timestamp = split_with_req
1122+ . iter ( )
1123+ . take ( min_required_splits)
1124+ . map ( |( split, _) | split. timestamp_end ( ) )
1125+ . max ( )
1126+ // if min_required_splits is 0, we choose a value that disables all splits
1127+ . unwrap_or ( i64:: MIN ) ;
1128+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1129+ if split. timestamp_start ( ) > biggest_end_timestamp {
1130+ disable_search_request_hits ( request) ;
1131+ }
1132+ }
1133+
1134+ Ok ( split_with_req)
1135+ }
1136+
1137+ fn optimize_find_trace_ids_aggregation (
1138+ request : Arc < SearchRequest > ,
1139+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1140+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1141+ splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) ) ;
1142+
1143+ if !is_simple_all_query ( & request) {
1144+ // no optimization opportunity here.
1145+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1146+ }
1147+
1148+ Ok ( Self :: to_splits_with_request ( splits, request) )
1149+ }
1150+
1151+ /// This function tries to detect upfront which splits contain the top n hits and convert other
1152+ /// split searches to count only searches. It also optimizes split order.
1153+ ///
1154+ /// Returns the search_requests with their split.
1155+ fn optimize (
1156+ & self ,
1157+ request : Arc < SearchRequest > ,
1158+ splits : Vec < SplitIdAndFooterOffsets > ,
1159+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
10701160 match self {
1071- CanSplitDoBetter :: SplitIdHigher ( _) => {
1072- // In this case there is no sort order, we order by split id.
1073- // If the the first split has enough documents, we can convert the other queries to
1074- // count only queries
1075- for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1076- disable_search_request_hits ( request) ;
1077- }
1161+ CanSplitDoBetter :: SplitIdHigher ( _) => Self :: optimize_split_id_higher ( request, splits) ,
1162+ CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1163+ Self :: optimize_split_timestamp_higher ( request, splits)
10781164 }
1079- CanSplitDoBetter :: Uninformative => { }
10801165 CanSplitDoBetter :: SplitTimestampLower ( _) => {
1081- // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1082- //
1083- // If we know that some splits will deliver enough documents, we can convert the
1084- // others to count only queries.
1085- // Since we only have start and end ranges and don't know the distribution we make
1086- // sure the splits dont' overlap, since the distribution of two
1087- // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1088- // queries.
1089- // ```
1090- // [. .] Split1 has enough docs, but last doc is not in top 2
1091- // [.. .] Split2 first doc is in top2
1092- // ```
1093- // Let's get the biggest timestamp_end of the first num_splits splits
1094- let biggest_end_timestamp = split_with_req
1095- . iter ( )
1096- . take ( min_required_splits)
1097- . map ( |( split, _) | split. timestamp_end ( ) )
1098- . max ( )
1099- // if min_required_splits is 0, we choose a value that disables all splits
1100- . unwrap_or ( i64:: MIN ) ;
1101- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1102- if split. timestamp_start ( ) > biggest_end_timestamp {
1103- disable_search_request_hits ( request) ;
1104- }
1105- }
1166+ Self :: optimize_split_timestamp_lower ( request, splits)
11061167 }
1107- CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1108- // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1109- //
1110- // We have the number of splits we need to search to get enough docs, now we need to
1111- // find the splits that don't overlap.
1112- //
1113- // Let's get the smallest timestamp_start of the first num_splits splits
1114- let smallest_start_timestamp = split_with_req
1115- . iter ( )
1116- . take ( min_required_splits)
1117- . map ( |( split, _) | split. timestamp_start ( ) )
1118- . min ( )
1119- // if min_required_splits is 0, we choose a value that disables all splits
1120- . unwrap_or ( i64:: MAX ) ;
1121- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1122- if split. timestamp_end ( ) < smallest_start_timestamp {
1123- disable_search_request_hits ( request) ;
1124- }
1125- }
1168+ CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1169+ Self :: optimize_find_trace_ids_aggregation ( request, splits)
11261170 }
1127- CanSplitDoBetter :: FindTraceIdsAggregation ( _ ) => { }
1171+ CanSplitDoBetter :: Uninformative => Ok ( Self :: to_splits_with_request ( splits , request ) ) ,
11281172 }
1129-
1130- Ok ( split_with_req)
11311173 }
11321174
11331175 /// Returns whether the given split can possibly give documents better than the one already
0 commit comments