diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index d25a243f2863..f692dc391a5d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2050,6 +2050,7 @@ GO_TARGETS = [ "//pkg/sql/execinfra:execinfra_test", "//pkg/sql/execinfrapb:execinfrapb", "//pkg/sql/execinfrapb:execinfrapb_test", + "//pkg/sql/execstats/execstatstypes:execstatstypes", "//pkg/sql/execstats:execstats", "//pkg/sql/execstats:execstats_test", "//pkg/sql/execversion:execversion", diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 345ac4338c76..07b0b9783654 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -432,6 +432,7 @@ go_library( "//pkg/sql/execinfra/execreleasable", "//pkg/sql/execinfrapb", "//pkg/sql/execstats", + "//pkg/sql/execstats/execstatstypes", "//pkg/sql/execversion", "//pkg/sql/exprutil", "//pkg/sql/faketreeeval", @@ -865,7 +866,7 @@ go_test( "//pkg/sql/distsql", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", - "//pkg/sql/execstats", + "//pkg/sql/execstats/execstatstypes", "//pkg/sql/flowinfra", "//pkg/sql/gcjob", "//pkg/sql/isql", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 009f7e3930fa..454624faa547 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -43,7 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations" "github.com/cockroachdb/cockroach/pkg/sql/idxusage" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" @@ -1260,15 +1260,12 @@ func (s *Server) newConnExecutor( ex.applicationName.Store(ex.sessionData().ApplicationName) ex.applicationStats = applicationStats - // We ignore statements and transactions run by the internal executor by - // passing a nil writer. ex.statsCollector = sslocal.NewStatsCollector( s.cfg.Settings, applicationStats, s.sqlStatsIngester, ex.phaseTimes, s.localSqlStats.GetCounters(), - s.cfg.SQLStatsTestingKnobs, ) ex.dataMutatorIterator.OnApplicationNameChange = func(newName string) { ex.applicationName.Store(newName) @@ -1703,7 +1700,7 @@ type connExecutor struct { // this transaction should collect execution stats. shouldCollectTxnExecutionStats bool // accumulatedStats are the accumulated stats of all statements. - accumulatedStats execstats.QueryLevelStats + accumulatedStats execstatstypes.QueryLevelStats // idleLatency is the cumulative amount of time spent waiting for the // client to send statements while holding the transaction open. @@ -3962,7 +3959,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) { p.cancelChecker.Reset(ctx) ex.initEvalCtx(ctx, &p.extendedEvalCtx, p) - + p.statsCollector = ex.statsCollector p.sessionDataMutatorIterator = ex.dataMutatorIterator p.noticeSender = nil p.preparedStatements = ex.getPrepStmtsAccessor() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 97aec36f2ffd..c1eb00dc64dd 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/hints" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" @@ -4187,7 +4188,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) { ex.extraTxnState.numRows = 0 // accumulatedStats are cleared, but shouldCollectTxnExecutionStats is // unchanged. - ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{} + ex.extraTxnState.accumulatedStats = execstatstypes.QueryLevelStats{} ex.extraTxnState.idleLatency = 0 ex.extraTxnState.rowsRead = 0 ex.extraTxnState.bytesRead = 0 @@ -4220,7 +4221,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.extraTxnState.transactionStatementsHash = util.MakeFNV64() ex.extraTxnState.transactionStatementFingerprintIDs = nil ex.extraTxnState.numRows = 0 - ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{} + ex.extraTxnState.accumulatedStats = execstatstypes.QueryLevelStats{} ex.extraTxnState.idleLatency = 0 ex.extraTxnState.rowsRead = 0 ex.extraTxnState.bytesRead = 0 diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 74e6fa1d2821..56bcc68d585f 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -12,7 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -314,7 +314,7 @@ func (p *planner) maybeLogStatementInternal( return } - var queryLevelStats execstats.QueryLevelStats + var queryLevelStats execstatstypes.QueryLevelStats if stats, ok := p.instrumentation.GetQueryLevelStats(); ok { queryLevelStats = *stats } diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index cc1e572ab002..24b00b74b6c2 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/base", "//pkg/kv/kvpb", "//pkg/sql/execinfrapb", + "//pkg/sql/execstats/execstatstypes", "//pkg/util", "//pkg/util/buildutil", "//pkg/util/optional", @@ -46,6 +47,7 @@ go_test( "//pkg/sql/execinfra", "//pkg/sql/execinfra/execopnode", "//pkg/sql/execinfrapb", + "//pkg/sql/execstats/execstatstypes", "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/testutils/serverutils", diff --git a/pkg/sql/execstats/execstatstypes/BUILD.bazel b/pkg/sql/execstats/execstatstypes/BUILD.bazel new file mode 100644 index 000000000000..b2cb46fdef4e --- /dev/null +++ b/pkg/sql/execstats/execstatstypes/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "execstatstypes", + srcs = ["types.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvpb", + "//pkg/util", + ], +) diff --git a/pkg/sql/execstats/execstatstypes/types.go b/pkg/sql/execstats/execstatstypes/types.go new file mode 100644 index 000000000000..7cbcb823a2be --- /dev/null +++ b/pkg/sql/execstats/execstatstypes/types.go @@ -0,0 +1,101 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package execstatstypes + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/util" +) + +// QueryLevelStats returns all the query level stats that correspond to the +// given traces and flow metadata. +// NOTE: When adding fields to this struct, be sure to update Accumulate. +type QueryLevelStats struct { + DistSQLNetworkBytesSent int64 + MaxMemUsage int64 + MaxDiskUsage int64 + KVBytesRead int64 + KVPairsRead int64 + KVRowsRead int64 + KVBatchRequestsIssued int64 + KVTime time.Duration + MvccSteps int64 + MvccStepsInternal int64 + MvccSeeks int64 + MvccSeeksInternal int64 + MvccBlockBytes int64 + MvccBlockBytesInCache int64 + MvccKeyBytes int64 + MvccValueBytes int64 + MvccPointCount int64 + MvccPointsCoveredByRangeTombstones int64 + MvccRangeKeyCount int64 + MvccRangeKeyContainedPoints int64 + MvccRangeKeySkippedPoints int64 + DistSQLNetworkMessages int64 + ContentionTime time.Duration + LockWaitTime time.Duration + LatchWaitTime time.Duration + ContentionEvents []kvpb.ContentionEvent + RUEstimate float64 + CPUTime time.Duration + // SQLInstanceIDs is an ordered list of SQL instances that were involved in + // query processing. + SQLInstanceIDs []int32 + // KVNodeIDs is an ordered list of KV Node IDs that were used for KV reads + // while processing the query. + KVNodeIDs []int32 + // Regions is an ordered list of regions in which both SQL and KV nodes + // involved in query processing reside. + Regions []string + // UsedFollowerRead indicates whether at least some reads were served by the + // follower replicas. + UsedFollowerRead bool + ClientTime time.Duration +} + +// Accumulate accumulates other's stats into the receiver. +func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { + s.DistSQLNetworkBytesSent += other.DistSQLNetworkBytesSent + if other.MaxMemUsage > s.MaxMemUsage { + s.MaxMemUsage = other.MaxMemUsage + } + if other.MaxDiskUsage > s.MaxDiskUsage { + s.MaxDiskUsage = other.MaxDiskUsage + } + s.KVBytesRead += other.KVBytesRead + s.KVPairsRead += other.KVPairsRead + s.KVRowsRead += other.KVRowsRead + s.KVBatchRequestsIssued += other.KVBatchRequestsIssued + s.KVTime += other.KVTime + s.MvccSteps += other.MvccSteps + s.MvccStepsInternal += other.MvccStepsInternal + s.MvccSeeks += other.MvccSeeks + s.MvccSeeksInternal += other.MvccSeeksInternal + s.MvccBlockBytes += other.MvccBlockBytes + s.MvccBlockBytesInCache += other.MvccBlockBytesInCache + s.MvccKeyBytes += other.MvccKeyBytes + s.MvccValueBytes += other.MvccValueBytes + s.MvccPointCount += other.MvccPointCount + s.MvccPointsCoveredByRangeTombstones += other.MvccPointsCoveredByRangeTombstones + s.MvccRangeKeyCount += other.MvccRangeKeyCount + s.MvccRangeKeyContainedPoints += other.MvccRangeKeyContainedPoints + s.MvccRangeKeySkippedPoints += other.MvccRangeKeySkippedPoints + s.DistSQLNetworkMessages += other.DistSQLNetworkMessages + s.ContentionTime += other.ContentionTime + s.LockWaitTime += other.LockWaitTime + s.LatchWaitTime += other.LatchWaitTime + s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...) + s.RUEstimate += other.RUEstimate + s.CPUTime += other.CPUTime + s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs) + s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs) + s.Regions = util.CombineUnique(s.Regions, other.Regions) + s.UsedFollowerRead = s.UsedFollowerRead || other.UsedFollowerRead + s.ClientTime += other.ClientTime +} diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index 4f5619a98c85..f474a6b98a7b 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -91,115 +92,29 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows return a } -// QueryLevelStats returns all the query level stats that correspond to the -// given traces and flow metadata. -// NOTE: When adding fields to this struct, be sure to update Accumulate. -type QueryLevelStats struct { - DistSQLNetworkBytesSent int64 - MaxMemUsage int64 - MaxDiskUsage int64 - KVBytesRead int64 - KVPairsRead int64 - KVRowsRead int64 - KVBatchRequestsIssued int64 - KVTime time.Duration - MvccSteps int64 - MvccStepsInternal int64 - MvccSeeks int64 - MvccSeeksInternal int64 - MvccBlockBytes int64 - MvccBlockBytesInCache int64 - MvccKeyBytes int64 - MvccValueBytes int64 - MvccPointCount int64 - MvccPointsCoveredByRangeTombstones int64 - MvccRangeKeyCount int64 - MvccRangeKeyContainedPoints int64 - MvccRangeKeySkippedPoints int64 - DistSQLNetworkMessages int64 - ContentionTime time.Duration - LockWaitTime time.Duration - LatchWaitTime time.Duration - ContentionEvents []kvpb.ContentionEvent - RUEstimate float64 - CPUTime time.Duration - // SQLInstanceIDs is an ordered list of SQL instances that were involved in - // query processing. - SQLInstanceIDs []int32 - // KVNodeIDs is an ordered list of KV Node IDs that were used for KV reads - // while processing the query. - KVNodeIDs []int32 - // Regions is an ordered list of regions in which both SQL and KV nodes - // involved in query processing reside. - Regions []string - // UsedFollowerRead indicates whether at least some reads were served by the - // follower replicas. - UsedFollowerRead bool - ClientTime time.Duration -} - // QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks // if an error occurred while getting query-level stats. type QueryLevelStatsWithErr struct { - Stats QueryLevelStats + Stats execstatstypes.QueryLevelStats Err error } // MakeQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a // QueryLevelStats and error. -func MakeQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr { +func MakeQueryLevelStatsWithErr( + stats execstatstypes.QueryLevelStats, err error, +) QueryLevelStatsWithErr { return QueryLevelStatsWithErr{ - stats, - err, - } -} - -// Accumulate accumulates other's stats into the receiver. -func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { - s.DistSQLNetworkBytesSent += other.DistSQLNetworkBytesSent - if other.MaxMemUsage > s.MaxMemUsage { - s.MaxMemUsage = other.MaxMemUsage - } - if other.MaxDiskUsage > s.MaxDiskUsage { - s.MaxDiskUsage = other.MaxDiskUsage + Stats: stats, + Err: err, } - s.KVBytesRead += other.KVBytesRead - s.KVPairsRead += other.KVPairsRead - s.KVRowsRead += other.KVRowsRead - s.KVBatchRequestsIssued += other.KVBatchRequestsIssued - s.KVTime += other.KVTime - s.MvccSteps += other.MvccSteps - s.MvccStepsInternal += other.MvccStepsInternal - s.MvccSeeks += other.MvccSeeks - s.MvccSeeksInternal += other.MvccSeeksInternal - s.MvccBlockBytes += other.MvccBlockBytes - s.MvccBlockBytesInCache += other.MvccBlockBytesInCache - s.MvccKeyBytes += other.MvccKeyBytes - s.MvccValueBytes += other.MvccValueBytes - s.MvccPointCount += other.MvccPointCount - s.MvccPointsCoveredByRangeTombstones += other.MvccPointsCoveredByRangeTombstones - s.MvccRangeKeyCount += other.MvccRangeKeyCount - s.MvccRangeKeyContainedPoints += other.MvccRangeKeyContainedPoints - s.MvccRangeKeySkippedPoints += other.MvccRangeKeySkippedPoints - s.DistSQLNetworkMessages += other.DistSQLNetworkMessages - s.ContentionTime += other.ContentionTime - s.LockWaitTime += other.LockWaitTime - s.LatchWaitTime += other.LatchWaitTime - s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...) - s.RUEstimate += other.RUEstimate - s.CPUTime += other.CPUTime - s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs) - s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs) - s.Regions = util.CombineUnique(s.Regions, other.Regions) - s.UsedFollowerRead = s.UsedFollowerRead || other.UsedFollowerRead - s.ClientTime += other.ClientTime } // TraceAnalyzer is a struct that helps calculate top-level statistics from a // flow metadata and an accompanying trace of the flows' execution. type TraceAnalyzer struct { *FlowsMetadata - queryLevelStats QueryLevelStats + queryLevelStats execstatstypes.QueryLevelStats } // NewTraceAnalyzer creates a TraceAnalyzer with the corresponding physical @@ -374,7 +289,7 @@ func getNumDistSQLNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentSta // GetQueryLevelStats returns the query level stats calculated and stored in // TraceAnalyzer. -func (a *TraceAnalyzer) GetQueryLevelStats() QueryLevelStats { +func (a *TraceAnalyzer) GetQueryLevelStats() execstatstypes.QueryLevelStats { return a.queryLevelStats } @@ -403,8 +318,8 @@ func getAllContentionEvents(trace []tracingpb.RecordedSpan) []kvpb.ContentionEve // errors to the caller but continues calculating other stats. func GetQueryLevelStats( trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowsMetadata []*FlowsMetadata, -) (QueryLevelStats, error) { - var queryLevelStats QueryLevelStats +) (execstatstypes.QueryLevelStats, error) { + var queryLevelStats execstatstypes.QueryLevelStats var errs error for _, metadata := range flowsMetadata { analyzer := NewTraceAnalyzer(metadata) diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index 4e7bc9aae281..21278b8df4fd 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -221,7 +222,7 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { }, ) - expected := execstats.QueryLevelStats{ + expected := execstatstypes.QueryLevelStats{ KVTime: cumulativeKVTime, ContentionTime: cumulativeContentionTime, } @@ -232,7 +233,7 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { func TestQueryLevelStatsAccumulate(t *testing.T) { aEvent := kvpb.ContentionEvent{Duration: 7 * time.Second} - a := execstats.QueryLevelStats{ + a := execstatstypes.QueryLevelStats{ DistSQLNetworkBytesSent: 1, MaxMemUsage: 2, KVBytesRead: 3, @@ -268,7 +269,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { ClientTime: time.Second, } bEvent := kvpb.ContentionEvent{Duration: 14 * time.Second} - b := execstats.QueryLevelStats{ + b := execstatstypes.QueryLevelStats{ DistSQLNetworkBytesSent: 8, MaxMemUsage: 9, KVBytesRead: 10, @@ -303,7 +304,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { UsedFollowerRead: true, ClientTime: 2 * time.Second, } - expected := execstats.QueryLevelStats{ + expected := execstatstypes.QueryLevelStats{ DistSQLNetworkBytesSent: 9, MaxMemUsage: 9, KVBytesRead: 13, diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 92ee3eb327da..f78cc2fe1ec7 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -13,8 +13,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" @@ -167,47 +167,16 @@ func (ex *connExecutor) recordStatementSummary( stmtErr error, stats topLevelQueryStats, ) appstatspb.StmtFingerprintID { - phaseTimes := ex.statsCollector.PhaseTimes() - - // Collect the statistics. - idleLatRaw := phaseTimes.GetIdleLatency(ex.statsCollector.PreviousPhaseTimes()) - idleLatSec := idleLatRaw.Seconds() - runLatRaw := phaseTimes.GetRunLatency() - runLatSec := runLatRaw.Seconds() - parseLatSec := phaseTimes.GetParsingLatency().Seconds() - planLatSec := phaseTimes.GetPlanningLatency().Seconds() - // We want to exclude any overhead to reduce possible confusion. - svcLatRaw := phaseTimes.GetServiceLatencyNoOverhead() - svcLatSec := svcLatRaw.Seconds() - - // processing latency: contributing towards SQL results. - processingLatSec := parseLatSec + planLatSec + runLatSec - - // overhead latency: txn/retry management, error checking, etc - execOverheadSec := svcLatSec - processingLatSec stmt := &planner.stmt flags := planner.curPlan.flags ex.recordStatementLatencyMetrics( - stmt, flags, automaticRetryTxnCount+automaticRetryStmtCount, runLatRaw, svcLatRaw, + stmt, flags, automaticRetryTxnCount+automaticRetryStmtCount, ex.statsCollector.RunLatency(), ex.statsCollector.ServiceLatency(), ) - fullScan := flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan) - idxRecommendations := idxrecommendations.FormatIdxRecommendations(planner.instrumentation.indexRecs) queryLevelStats, queryLevelStatsOk := planner.instrumentation.GetQueryLevelStats() - var sqlInstanceIDs []int64 - var kvNodeIDs []int32 - if queryLevelStatsOk { - sqlInstanceIDs = make([]int64, 0, len(queryLevelStats.SQLInstanceIDs)) - for _, sqlInstanceID := range queryLevelStats.SQLInstanceIDs { - sqlInstanceIDs = append(sqlInstanceIDs, int64(sqlInstanceID)) - } - kvNodeIDs = queryLevelStats.KVNodeIDs - } - startTime := phaseTimes.GetSessionPhaseTime(sessionphase.PlannerStartExecStmt).ToUTC() - implicitTxn := flags.IsSet(planFlagImplicitTxn) stmtFingerprintID := planner.instrumentation.fingerprintId autoRetryReason := ex.state.mu.autoRetryReason if automaticRetryStmtCount > 0 { @@ -221,50 +190,41 @@ func (ex *connExecutor) recordStatementSummary( ex.metrics.EngineMetrics.StatementIndexBytesWritten.Inc(stats.indexBytesWritten) if ex.statsCollector.EnabledForTransaction() { - recordedStmtStats := &sqlstats.RecordedStmtStats{ - FingerprintID: stmtFingerprintID, - QuerySummary: stmt.StmtSummary, - Generic: flags.IsSet(planFlagGeneric), - DistSQL: flags.ShouldBeDistributed(), - Vec: flags.IsSet(planFlagVectorized), - ImplicitTxn: implicitTxn, - PlanHash: planner.instrumentation.planGist.Hash(), - SessionID: ex.planner.extendedEvalCtx.SessionID, - StatementID: stmt.QueryID, - AutoRetryCount: automaticRetryTxnCount + automaticRetryStmtCount, - Failed: stmtErr != nil, - AutoRetryReason: autoRetryReason, - RowsAffected: rowsAffected, - IdleLatencySec: idleLatSec, - ParseLatencySec: parseLatSec, - PlanLatencySec: planLatSec, - RunLatencySec: runLatSec, - ServiceLatencySec: svcLatSec, - OverheadLatencySec: execOverheadSec, - BytesRead: stats.bytesRead, - RowsRead: stats.rowsRead, - RowsWritten: stats.rowsWritten, - Nodes: sqlInstanceIDs, - KVNodeIDs: kvNodeIDs, - StatementType: stmt.AST.StatementType(), - PlanGist: planner.instrumentation.planGist.String(), - StatementError: stmtErr, - IndexRecommendations: idxRecommendations, - Query: stmt.StmtNoConstants, - StartTime: startTime, - EndTime: startTime.Add(svcLatRaw), - FullScan: fullScan, - ExecStats: queryLevelStats, + b := sqlstats.NewRecordedStatementStatsBuilder[*sslocal.StatsCollector]( + stmtFingerprintID, + planner.SessionData().Database, + stmt.StmtNoConstants, + stmt.StmtSummary, + stmt.AST.StatementType().String(), + ex.statsCollector.CurrentApplicationName(), + ). + QueryID(stmt.QueryID). + SessionID(ex.planner.extendedEvalCtx.SessionID). + PlanMetadata( + flags.IsSet(planFlagGeneric), + flags.ShouldBeDistributed(), + flags.IsSet(planFlagVectorized), + flags.IsSet(planFlagImplicitTxn), + flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan), + ). + PlanGist(planner.instrumentation.planGist.String(), planner.instrumentation.planGist.Hash()). + LatencyRecorder(ex.statsCollector). + QueryLevelStats(stats.bytesRead, stats.rowsRead, stats.rowsWritten). + ExecStats(queryLevelStats). // TODO(mgartner): Use a slice of struct{uint64, uint64} instead of // converting to strings. - Indexes: planner.instrumentation.indexesUsed.Strings(), - Database: planner.SessionData().Database, - QueryTags: stmt.QueryTags, - App: ex.statsCollector.CurrentApplicationName(), - UnderOuterTxn: ex.extraTxnState.underOuterTxn, + Indexes(planner.instrumentation.indexesUsed.Strings()). + AutoRetry(automaticRetryTxnCount+automaticRetryStmtCount, autoRetryReason). + RowsAffected(rowsAffected). + IndexRecommendations(idxRecommendations). + QueryTags(stmt.QueryTags). + StatementError(stmtErr) + + if ex.extraTxnState.underOuterTxn { + b = b.UnderOuterTxn() } - ex.statsCollector.RecordStatement(ctx, recordedStmtStats) + ex.statsCollector.RecordStatement(ctx, b.Build()) } // Record statement execution statistics if span is recorded and no error was @@ -313,11 +273,16 @@ func (ex *connExecutor) recordStatementSummary( ex.extraTxnState.transactionStatementsHash.Add(uint64(stmtFingerprintID)) } ex.extraTxnState.numRows += rowsAffected - ex.extraTxnState.idleLatency += idleLatRaw + ex.extraTxnState.idleLatency += ex.statsCollector.IdleLatency() if log.V(2) { // ages since significant epochs - sessionAge := phaseTimes.GetSessionAge().Seconds() + sessionAge := ex.statsCollector.PhaseTimes().GetSessionAge().Seconds() + parseLatSec := ex.statsCollector.ParsingLatency().Seconds() + planLatSec := ex.statsCollector.PlanningLatency().Seconds() + runLatSec := ex.statsCollector.RunLatency().Seconds() + svcLatSec := ex.statsCollector.ServiceLatency().Seconds() + execOverheadSec := ex.statsCollector.ExecOverheadLatency().Seconds() log.Dev.Infof(ctx, "query stats: %d rows, %d retries, "+ diff --git a/pkg/sql/explain_tree_test.go b/pkg/sql/explain_tree_test.go index ff99f4e4019a..4170a807a419 100644 --- a/pkg/sql/explain_tree_test.go +++ b/pkg/sql/explain_tree_test.go @@ -13,7 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -85,7 +85,7 @@ func TestPlanToTreeAndPlanToString(t *testing.T) { ctx, explain.Flags{Verbose: true, ShowTypes: true}, sessionphase.NewTimes(), - &execstats.QueryLevelStats{}, + &execstatstypes.QueryLevelStats{}, ) return ob.BuildString() default: diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 09947c7bd974..184e68e07d73 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" @@ -245,7 +246,10 @@ const ( // GetQueryLevelStats gets the QueryLevelStats if they are available. // The query level stats are only available if tracing is enabled. -func (ih *instrumentationHelper) GetQueryLevelStats() (stats *execstats.QueryLevelStats, ok bool) { +func (ih *instrumentationHelper) GetQueryLevelStats() ( + stats *execstatstypes.QueryLevelStats, + ok bool, +) { statsWithErr := ih.queryLevelStatsWithErr if statsWithErr == nil || statsWithErr.Err != nil { @@ -849,7 +853,7 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder( ctx context.Context, flags explain.Flags, phaseTimes *sessionphase.Times, - queryStats *execstats.QueryLevelStats, + queryStats *execstatstypes.QueryLevelStats, ) *explain.OutputBuilder { ob := explain.NewOutputBuilder(flags) if ih.explainPlan == nil { @@ -954,7 +958,7 @@ func (ih *instrumentationHelper) setExplainAnalyzeResult( ctx context.Context, res RestrictedCommandResult, phaseTimes *sessionphase.Times, - queryLevelStats *execstats.QueryLevelStats, + queryLevelStats *execstatstypes.QueryLevelStats, distSQLFlowInfos []flowInfo, trace tracingpb.Recording, ) (commErr error) { diff --git a/pkg/sql/opt/exec/execbuilder/BUILD.bazel b/pkg/sql/opt/exec/execbuilder/BUILD.bazel index 9735e2ec70e8..165aa68b6ac0 100644 --- a/pkg/sql/opt/exec/execbuilder/BUILD.bazel +++ b/pkg/sql/opt/exec/execbuilder/BUILD.bazel @@ -17,6 +17,7 @@ go_library( deps = [ "//pkg/kv/kvserver/concurrency/isolation", "//pkg/server/telemetry", + "//pkg/sql/appstatspb", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/lexbase", @@ -45,6 +46,7 @@ go_library( "//pkg/sql/sem/tree/treewindow", "//pkg/sql/sem/volatility", "//pkg/sql/sqlerrors", + "//pkg/sql/sqlstats", "//pkg/sql/sqltelemetry", "//pkg/sql/types", "//pkg/util/buildutil", @@ -56,6 +58,7 @@ go_library( "//pkg/util/metamorphic", "//pkg/util/timeutil", "//pkg/util/treeprinter", + "@com_github_cockroachdb_crlib//crtime", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index de6d5203c5f8..52ffe62d5dc2 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -3731,6 +3731,7 @@ func (b *Builder) buildCall(c *memo.CallExpr) (_ execPlan, outputCols colOrdMap, udf.Def.BodyProps, udf.Def.BodyStmts, udf.Def.BodyTags, + udf.Def.BodyASTs, false, /* allowOuterWithRefs */ nil, /* wrapRootExpr */ 0, /* resultBufferID */ diff --git a/pkg/sql/opt/exec/execbuilder/scalar.go b/pkg/sql/opt/exec/execbuilder/scalar.go index 683ef733d225..91f4002d841d 100644 --- a/pkg/sql/opt/exec/execbuilder/scalar.go +++ b/pkg/sql/opt/exec/execbuilder/scalar.go @@ -8,6 +8,7 @@ package execbuilder import ( "context" + "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" @@ -24,9 +25,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/crlib/crtime" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -689,16 +692,7 @@ func (b *Builder) buildExistsSubquery( // Create a plan generator that can plan the single statement // representing the subquery, and wrap the routine in a COALESCE. - planGen := b.buildRoutinePlanGenerator( - params, - stmts, - stmtProps, - nil, /* stmtStr */ - make([]string, len(stmts)), - true, /* allowOuterWithRefs */ - wrapRootExpr, - 0, /* resultBufferID */ - ) + planGen := b.buildRoutinePlanGenerator(params, stmts, stmtProps, nil, make([]string, len(stmts)), nil, true, wrapRootExpr, 0) return tree.NewTypedCoalesceExpr(tree.TypedExprs{ tree.NewTypedRoutineExpr( "exists", @@ -816,16 +810,7 @@ func (b *Builder) buildSubquery( // Create a tree.RoutinePlanFn that can plan the single statement // representing the subquery. - planGen := b.buildRoutinePlanGenerator( - params, - stmts, - stmtProps, - nil, /* stmtStr */ - make([]string, len(stmts)), - true, /* allowOuterWithRefs */ - nil, /* wrapRootExpr */ - 0, /* resultBufferID */ - ) + planGen := b.buildRoutinePlanGenerator(params, stmts, stmtProps, nil, make([]string, len(stmts)), nil, true, nil, 0) _, tailCall := b.tailCalls[subquery] return tree.NewTypedRoutineExpr( "subquery", @@ -900,7 +885,7 @@ func (b *Builder) buildSubquery( if err != nil { return err } - err = fn(plan, "" /* stmtForDistSQLDiagram */, true /* isFinalPlan */) + err = fn(plan, nil, "" /* stmtForDistSQLDiagram */, true /* isFinalPlan */) if err != nil { return err } @@ -1011,16 +996,7 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ } // Create a tree.RoutinePlanFn that can plan the statements in the UDF body. - planGen := b.buildRoutinePlanGenerator( - udf.Def.Params, - udf.Def.Body, - udf.Def.BodyProps, - udf.Def.BodyStmts, - udf.Def.BodyTags, - false, /* allowOuterWithRefs */ - nil, /* wrapRootExpr */ - udf.Def.ResultBufferID, - ) + planGen := b.buildRoutinePlanGenerator(udf.Def.Params, udf.Def.Body, udf.Def.BodyProps, udf.Def.BodyStmts, udf.Def.BodyTags, udf.Def.BodyASTs, false, nil, udf.Def.ResultBufferID) // Enable stepping for volatile functions so that statements within the UDF // see mutations made by the invoking statement and by previously executed @@ -1085,16 +1061,7 @@ func (b *Builder) initRoutineExceptionHandler( Actions: make([]*tree.RoutineExpr, len(exceptionBlock.Actions)), } for i, action := range exceptionBlock.Actions { - actionPlanGen := b.buildRoutinePlanGenerator( - action.Params, - action.Body, - action.BodyProps, - action.BodyStmts, - action.BodyTags, - false, /* allowOuterWithRefs */ - nil, /* wrapRootExpr */ - 0, /* resultBufferID */ - ) + actionPlanGen := b.buildRoutinePlanGenerator(action.Params, action.Body, action.BodyProps, action.BodyStmts, action.BodyTags, nil, false, nil, 0) // Build a routine with no arguments for the exception handler. The actual // arguments will be supplied when (if) the handler is invoked. exceptionHandler.Actions[i] = tree.NewTypedRoutineExpr( @@ -1141,6 +1108,7 @@ func (b *Builder) buildRoutinePlanGenerator( stmtProps []*physical.Required, stmtStr []string, stmtTags []string, + stmtASTs []tree.Statement, allowOuterWithRefs bool, wrapRootExpr wrapRootExprFn, resultBufferID memo.RoutineResultBufferID, @@ -1207,7 +1175,12 @@ func (b *Builder) buildRoutinePlanGenerator( dbName := b.evalCtx.SessionData().Database appName := b.evalCtx.SessionData().ApplicationName + format := tree.FmtHideConstants | tree.FmtFlags(tree.QueryFormattingForFingerprintsMask.Get(&b.evalCtx.Settings.SV)) for i := range stmts { + var statsBuilder *sqlstats.RecordedStatementStatsBuilder[*sqlstats.LatencyRecorder] + latencyRecorder := sqlstats.NewStatementLatencyRecorder() + latencyRecorder.RecordPhase(sqlstats.StatementStarted, crtime.NowMono()) + latencyRecorder.RecordPhase(sqlstats.StatementStartParsing, crtime.NowMono()) stmt := stmts[i] props := stmtProps[i] var tag string @@ -1216,6 +1189,18 @@ func (b *Builder) buildRoutinePlanGenerator( if i < len(stmtTags) { tag = stmtTags[i] } + if i < len(stmtASTs) { + fingerprint := tree.FormatStatementHideConstants(stmtASTs[i], format) + fpId := appstatspb.ConstructStatementFingerprintID(fingerprint, b.evalCtx.TxnImplicit, dbName) + summary := tree.FormatStatementSummary(stmtASTs[i], format) + stmtType := stmtASTs[i].StatementType().String() + statsBuilder = sqlstats.NewRecordedStatementStatsBuilder[*sqlstats.LatencyRecorder]( + fpId, dbName, fingerprint, summary, stmtType, appName, + ).WithLatencyRecorder(latencyRecorder) + } + + latencyRecorder.RecordPhase(sqlstats.StatementEndParsing, crtime.NowMono()) + latencyRecorder.RecordPhase(sqlstats.StatementStartPlanning, crtime.NowMono()) o.Init(ctx, b.evalCtx, b.catalog) f := o.Factory() @@ -1324,7 +1309,8 @@ func (b *Builder) buildRoutinePlanGenerator( stmtForDistSQLDiagram = stmtStr[i] } incrementRoutineStmtCounter(b.evalCtx.StartedRoutineStatementCounters, dbName, appName, tag) - err = fn(plan, stmtForDistSQLDiagram, isFinalPlan) + latencyRecorder.RecordPhase(sqlstats.StatementEndPlanning, crtime.NowMono()) + err = fn(plan, statsBuilder, stmtForDistSQLDiagram, isFinalPlan) if err != nil { return err } diff --git a/pkg/sql/opt/memo/expr.go b/pkg/sql/opt/memo/expr.go index bcb57c6e2483..f520e39c944b 100644 --- a/pkg/sql/opt/memo/expr.go +++ b/pkg/sql/opt/memo/expr.go @@ -742,6 +742,8 @@ type UDFDefinition struct { // at the same position in Body. BodyProps []*physical.Required + BodyASTs []tree.Statement + // BodyStmts, if set, is the string representation of each statement in // Body. It is only populated when verbose tracing is enabled. BodyStmts []string diff --git a/pkg/sql/opt/optbuilder/plpgsql.go b/pkg/sql/opt/optbuilder/plpgsql.go index bbaa8b695b4c..1ce66fb6fa81 100644 --- a/pkg/sql/opt/optbuilder/plpgsql.go +++ b/pkg/sql/opt/optbuilder/plpgsql.go @@ -678,7 +678,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) if b.options.insideDataSource && b.setReturnType.Family() == types.TupleFamily { retNextScope = b.ob.expandRoutineTupleIntoCols(retNextScope) } - b.appendBodyStmtFromScope(&retCon, retNextScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&retCon, retNextScope, nil /* stmt */) b.appendPlpgSQLStmts(&retCon, stmts[i+1:]) return b.callContinuation(&retCon, s) @@ -720,7 +720,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) if !b.options.insideDataSource && b.setReturnType.Family() == types.TupleFamily { retQueryScope = b.ob.combineRoutineColsIntoTuple(retQueryScope) } - b.appendBodyStmtFromScope(&retCon, retQueryScope, t.SqlStmt.StatementTag()) + b.appendBodyStmtFromScope(&retCon, retQueryScope, t.SqlStmt) b.appendPlpgSQLStmts(&retCon, stmts[i+1:]) return b.callContinuation(&retCon, s) @@ -949,7 +949,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) // crdb_internal.plpgsql_raise builtin function. con := b.makeContinuation("_stmt_raise") con.def.Volatility = volatility.Volatile - b.appendBodyStmtFromScope(&con, b.buildPLpgSQLRaise(con.s, b.getRaiseArgs(con.s, t)), "" /* stmtTag */) + b.appendBodyStmtFromScope(&con, b.buildPLpgSQLRaise(con.s, b.getRaiseArgs(con.s, t)), nil /* stmt */) b.appendPlpgSQLStmts(&con, stmts[i+1:]) return b.callContinuation(&con, s) @@ -970,7 +970,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) if len(t.Target) == 0 { // When there is no INTO target, build the SQL statement into a body // statement that is only executed for its side effects. - b.appendBodyStmtFromScope(&execCon, stmtScope, t.SqlStmt.StatementTag()) + b.appendBodyStmtFromScope(&execCon, stmtScope, t.SqlStmt) b.appendPlpgSQLStmts(&execCon, stmts[i+1:]) return b.callContinuation(&execCon, s) } @@ -1028,7 +1028,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) intoScope = b.callContinuation(&retCon, intoScope) // Step 3: call the INTO continuation from the parent scope. - b.appendBodyStmtFromScope(&execCon, intoScope, t.SqlStmt.StatementTag()) + b.appendBodyStmtFromScope(&execCon, intoScope, t.SqlStmt) return b.callContinuation(&execCon, s) case *ast.Open: @@ -1068,7 +1068,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) // Cursors with mutations are invalid. panic(cursorMutationErr) } - b.appendBodyStmtFromScope(&openCon, openScope, query.StatementTag()) + b.appendBodyStmtFromScope(&openCon, openScope, query) b.appendPlpgSQLStmts(&openCon, stmts[i+1:]) // Build a statement to generate a unique name for the cursor if one @@ -1078,7 +1078,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) nameCon := b.makeContinuation("_gen_cursor_name") nameCon.def.Volatility = volatility.Volatile nameScope := b.buildCursorNameGen(&nameCon, t.CurVar) - b.appendBodyStmtFromScope(&nameCon, b.callContinuation(&openCon, nameScope), "" /* stmtTag */) + b.appendBodyStmtFromScope(&nameCon, b.callContinuation(&openCon, nameScope), nil /* stmt */) return b.callContinuation(&nameCon, s) case *ast.Close: @@ -1118,7 +1118,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) closeScope := closeCon.s.push() b.ob.synthesizeColumn(closeScope, closeColName, types.Int, nil /* expr */, closeCall) b.ob.constructProjectForScope(closeCon.s, closeScope) - b.appendBodyStmtFromScope(&closeCon, closeScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&closeCon, closeScope, nil /* stmt */) b.appendPlpgSQLStmts(&closeCon, stmts[i+1:]) return b.callContinuation(&closeCon, s) @@ -1142,7 +1142,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) fetchCon.def.Volatility = volatility.Volatile fetchScope := b.buildFetch(fetchCon.s, t) if t.IsMove { - b.appendBodyStmtFromScope(&fetchCon, fetchScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&fetchCon, fetchScope, nil /* stmt */) b.appendPlpgSQLStmts(&fetchCon, stmts[i+1:]) return b.callContinuation(&fetchCon, s) } @@ -1173,7 +1173,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) intoScope = b.callContinuation(&retCon, intoScope) // Add the built statement to the FETCH continuation. - b.appendBodyStmtFromScope(&fetchCon, intoScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&fetchCon, intoScope, nil /* stmt */) return b.callContinuation(&fetchCon, s) case *ast.Null: @@ -1275,7 +1275,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) if len(target) == 0 { // When there is no INTO target, build the nested procedure call into a // body statement that is only executed for its side effects. - b.appendBodyStmtFromScope(&callCon, callScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&callCon, callScope, nil /* stmt */) b.appendPlpgSQLStmts(&callCon, stmts[i+1:]) return b.callContinuation(&callCon, s) } @@ -1290,7 +1290,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) intoScope = b.callContinuation(&retCon, intoScope) // Add the built statement to the CALL continuation. - b.appendBodyStmtFromScope(&callCon, intoScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&callCon, intoScope, nil /* stmt */) return b.callContinuation(&callCon, s) case *ast.DoBlock: @@ -1306,7 +1306,7 @@ func (b *plpgsqlBuilder) buildPLpgSQLStatements(stmts []ast.Statement, s *scope) doCon := b.makeContinuation("_stmt_do") doCon.def.Volatility = volatility.Volatile bodyScope := b.ob.buildPLpgSQLDoBody(t) - b.appendBodyStmtFromScope(&doCon, bodyScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&doCon, bodyScope, nil /* stmt */) b.appendPlpgSQLStmts(&doCon, stmts[i+1:]) return b.callContinuation(&doCon, s) @@ -1456,7 +1456,7 @@ func (b *plpgsqlBuilder) handleIntForLoop( ) // Call recursively into the loop body continuation. incScope = b.callContinuation(&loopCon, incScope) - b.appendBodyStmtFromScope(&incrementCon, incScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(&incrementCon, incScope, nil /* stmt */) // Notably, we call the loop body continuation here, rather than the // increment continuation, because the counter should not be incremented @@ -2020,7 +2020,7 @@ func (b *plpgsqlBuilder) buildEndOfFunctionRaise(con *continuation) { pgcode.RoutineExceptionFunctionExecutedNoReturnStatement.String(), /* code */ ) con.def.Volatility = volatility.Volatile - b.appendBodyStmtFromScope(con, b.buildPLpgSQLRaise(con.s, args), "" /* stmtTag */) + b.appendBodyStmtFromScope(con, b.buildPLpgSQLRaise(con.s, args), nil /* stmt */) // Build a dummy statement that returns NULL. It won't be executed, but // ensures that the continuation routine's return type is correct. @@ -2029,7 +2029,7 @@ func (b *plpgsqlBuilder) buildEndOfFunctionRaise(con *continuation) { typedNull := b.ob.factory.ConstructNull(b.returnType) b.ob.synthesizeColumn(eofScope, eofColName, b.returnType, nil /* expr */, typedNull) b.ob.constructProjectForScope(con.s, eofScope) - b.appendBodyStmtFromScope(con, eofScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(con, eofScope, nil /* stmt */) } // addOneRowCheck handles INTO STRICT, where a SQL statement is required to @@ -2280,7 +2280,7 @@ func (b *plpgsqlBuilder) makeContinuationWithTyp( // routine definitions, which need to push the continuation before it is // finished. The separation also allows for appending multiple body statements. func (b *plpgsqlBuilder) appendBodyStmtFromScope( - con *continuation, bodyScope *scope, stmtTag string, + con *continuation, bodyScope *scope, stmt tree.Statement, ) { // Set the volatility of the continuation routine to the least restrictive // volatility level in the Relational properties of the body statements. @@ -2290,8 +2290,11 @@ func (b *plpgsqlBuilder) appendBodyStmtFromScope( con.def.Volatility = vol } con.def.Body = append(con.def.Body, bodyExpr) - con.def.BodyTags = append(con.def.BodyTags, stmtTag) con.def.BodyProps = append(con.def.BodyProps, bodyScope.makePhysicalProps()) + if stmt != nil { + con.def.BodyTags = append(con.def.BodyTags, stmt.StatementTag()) + con.def.BodyASTs = append(con.def.BodyASTs, stmt) + } } // appendPlpgSQLStmts builds the given PLpgSQL statements into a relational @@ -2301,7 +2304,7 @@ func (b *plpgsqlBuilder) appendPlpgSQLStmts(con *continuation, stmts []ast.State // Make sure to push s before constructing the continuation scope to ensure // that the parameter columns are not projected. continuationScope := b.buildPLpgSQLStatements(stmts, con.s.push()) - b.appendBodyStmtFromScope(con, continuationScope, "" /* stmtTag */) + b.appendBodyStmtFromScope(con, continuationScope, nil /* stmt */) } // callContinuation adds a column that projects the result of calling the diff --git a/pkg/sql/opt/optbuilder/routine.go b/pkg/sql/opt/optbuilder/routine.go index b249afbc0a94..1b11a19f6422 100644 --- a/pkg/sql/opt/optbuilder/routine.go +++ b/pkg/sql/opt/optbuilder/routine.go @@ -420,10 +420,15 @@ func (b *Builder) buildRoutine( var bodyProps []*physical.Required var bodyStmts []string var bodyTags []string + var bodyASTs []tree.Statement switch o.Language { case tree.RoutineLangSQL: // Parse the function body. stmts, err := parser.Parse(o.Body) + bodyASTs = make([]tree.Statement, len(stmts)) + for i := range stmts { + bodyASTs[i] = stmts[i].AST + } if err != nil { panic(err) } @@ -539,6 +544,7 @@ func (b *Builder) buildRoutine( BodyProps: bodyProps, BodyStmts: bodyStmts, BodyTags: bodyTags, + BodyASTs: bodyASTs, Params: params, ResultBufferID: resultBufferID, }, diff --git a/pkg/sql/pgwire/BUILD.bazel b/pkg/sql/pgwire/BUILD.bazel index fa58f0236d43..6fc210ef2574 100644 --- a/pkg/sql/pgwire/BUILD.bazel +++ b/pkg/sql/pgwire/BUILD.bazel @@ -61,7 +61,7 @@ go_library( "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", "//pkg/sql/sessiondatapb", - "//pkg/sql/sqlcommenter", + "//pkg/sql/sqlclustersettings", "//pkg/sql/sqltelemetry", "//pkg/sql/types", "//pkg/util", diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 258bc07fe8fc..4973aa8a5c54 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -34,7 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" - "github.com/cockroachdb/cockroach/pkg/sql/sqlcommenter" + "github.com/cockroachdb/cockroach/pkg/sql/sqlclustersettings" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" @@ -381,7 +381,7 @@ func (c *conn) handleSimpleQuery( }, ) } - stmts, err := c.parser.ParseWithOptions(query, sqlcommenter.MaybeRetainComments(c.sv).WithIntType(unqualifiedIntSize)) + stmts, err := c.parser.ParseWithOptions(query, sqlclustersettings.MaybeRetainComments(c.sv).WithIntType(unqualifiedIntSize)) if err != nil { log.SqlExec.Infof(ctx, "could not parse simple query: %s", query) return c.stmtBuf.Push(ctx, sql.SendError{Err: err}) @@ -517,7 +517,7 @@ func (c *conn) handleParse(ctx context.Context, nakedIntSize *types.T) error { } startParse := crtime.NowMono() - stmts, err := c.parser.ParseWithOptions(query, sqlcommenter.MaybeRetainComments(c.sv).WithIntType(nakedIntSize)) + stmts, err := c.parser.ParseWithOptions(query, sqlclustersettings.MaybeRetainComments(c.sv).WithIntType(nakedIntSize)) if err != nil { log.SqlExec.Infof(ctx, "could not parse: %s", query) return c.stmtBuf.Push(ctx, sql.SendError{Err: err}) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 2bd5d90bcb58..44a88fb3e08c 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -225,6 +225,8 @@ type planner struct { instrumentation instrumentationHelper + statsCollector *sslocal.StatsCollector + // Contexts for different stages of planning and execution. semaCtx tree.SemaContext extendedEvalCtx extendedEvalContext diff --git a/pkg/sql/routine.go b/pkg/sql/routine.go index 97b2966c109b..61a1afbcf4ac 100644 --- a/pkg/sql/routine.go +++ b/pkg/sql/routine.go @@ -19,13 +19,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/crlib/crtime" "github.com/cockroachdb/errors" ) @@ -325,7 +328,7 @@ func (g *routineGenerator) startInternal(ctx context.Context, txn *kv.Txn) (err rrw := NewRowResultWriter(&g.rch) var cursorHelper *plpgsqlCursorHelper err = g.expr.ForEachPlan(ctx, ef, rrw, g.args, - func(plan tree.RoutinePlan, stmtForDistSQLDiagram string, isFinalPlan bool) error { + func(plan tree.RoutinePlan, statsBuilder *sqlstats.RecordedStatementStatsBuilder[*sqlstats.LatencyRecorder], stmtForDistSQLDiagram string, isFinalPlan bool) error { stmtIdx++ opName := "routine-stmt-" + g.expr.Name + "-" + strconv.Itoa(stmtIdx) ctx, sp := tracing.ChildSpan(ctx, opName) @@ -365,13 +368,50 @@ func (g *routineGenerator) startInternal(ctx context.Context, txn *kv.Txn) (err return err } } - + var latencyRecorder *sqlstats.LatencyRecorder // Run the plan. params := runParams{ctx, g.p.ExtendedEvalContext(), g.p} + if statsBuilder != nil { + latencyRecorder = statsBuilder.GetLatencyRecorder() + defer func() { + flags := plan.(*planComponents).flags + latencyRecorder.RecordPhase(sqlstats.StatementEnd, crtime.NowMono()) + if g.p.statsCollector == nil { + if buildutil.CrdbTestBuild { + panic("No stats collector exists on the planner, cannot record statement stats") + } + log.Dev.Error(ctx, "No stats collector exists on the planner, cannot record statement stats") + return + } + stmtStats := statsBuilder. + SessionID(g.p.ExtendedEvalContext().SessionID). + LatencyRecorder(latencyRecorder). + PlanMetadata( + flags.IsSet(planFlagGeneric), + flags.ShouldBeDistributed(), + flags.IsSet(planFlagVectorized), + flags.IsSet(planFlagImplicitTxn), + flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan), + ). + PlanGist("", 0). // TODO: is there a gist we can put here? I don't think so + Build() + g.p.statsCollector.RecordStatement(ctx, + stmtStats, + ) + }() + } + if latencyRecorder != nil { + latencyRecorder.RecordPhase(sqlstats.StatementStartExec, crtime.NowMono()) + } queryStats, err := runPlanInsidePlan(ctx, params, plan.(*planComponents), w, g, stmtForDistSQLDiagram) + if latencyRecorder != nil { + latencyRecorder.RecordPhase(sqlstats.StatementEndExec, crtime.NowMono()) + } if err != nil { + statsBuilder.StatementError(err) return err } + statsBuilder.QueryLevelStats(queryStats.bytesRead, queryStats.rowsRead, queryStats.rowsWritten) forwardInnerQueryStats(g.p.routineMetadataForwarder, queryStats) if openCursor { return cursorHelper.createCursor(g.p) diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index 459d7631153a..ddebe9b25e59 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -160,6 +160,7 @@ go_library( "//pkg/sql/sem/tree/treewindow", "//pkg/sql/sem/volatility", "//pkg/sql/sessiondatapb", + "//pkg/sql/sqlstats", "//pkg/sql/types", "//pkg/util", "//pkg/util/bitarray", diff --git a/pkg/sql/sem/tree/routine.go b/pkg/sql/sem/tree/routine.go index 4590c129d679..98b79ee29ea2 100644 --- a/pkg/sql/sem/tree/routine.go +++ b/pkg/sql/sem/tree/routine.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" @@ -35,7 +36,7 @@ type RoutinePlanGenerator func( // to specify the SQL stmt corresponding to the plan. // - isFinalPlan is true if no more plans will be generated after the current // plan. -type RoutinePlanGeneratedFunc func(plan RoutinePlan, stmtForDistSQLDiagram string, isFinalPlan bool) error +type RoutinePlanGeneratedFunc func(plan RoutinePlan, builder *sqlstats.RecordedStatementStatsBuilder[*sqlstats.LatencyRecorder], stmtForDistSQLDiagram string, isFinalPlan bool) error // RoutinePlan represents a plan for a statement in a routine. It currently maps // to exec.Plan. We use the empty interface here rather than exec.Plan to avoid diff --git a/pkg/sql/sqlclustersettings/BUILD.bazel b/pkg/sql/sqlclustersettings/BUILD.bazel index 2e7adb479999..aba2b29f0fb8 100644 --- a/pkg/sql/sqlclustersettings/BUILD.bazel +++ b/pkg/sql/sqlclustersettings/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/keys", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/parser", "//pkg/util/metamorphic", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/sql/sqlclustersettings/clustersettings.go b/pkg/sql/sqlclustersettings/clustersettings.go index 4dcf3934a223..d7677d1365f1 100644 --- a/pkg/sql/sqlclustersettings/clustersettings.go +++ b/pkg/sql/sqlclustersettings/clustersettings.go @@ -9,6 +9,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/errors" ) @@ -152,3 +153,19 @@ var LDRImmediateModeWriter = settings.RegisterStringSetting( return nil }), ) + +var sqlCommenterEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.sqlcommenter.enabled", + "enables support for sqlcommenter. Key value parsed from sqlcommenter "+ + "comments will be included in sql insights and sql logs. "+ + "See https://google.github.io/sqlcommenter/ for more details.", + false, + settings.WithPublic) + +func MaybeRetainComments(sv *settings.Values) parser.ParseOptions { + if sqlCommenterEnabled.Get(sv) { + return parser.DefaultParseOptions.RetainComments() + } + return parser.DefaultParseOptions +} diff --git a/pkg/sql/sqlcommenter/BUILD.bazel b/pkg/sql/sqlcommenter/BUILD.bazel index 5e97e3b496df..f51895058e48 100644 --- a/pkg/sql/sqlcommenter/BUILD.bazel +++ b/pkg/sql/sqlcommenter/BUILD.bazel @@ -5,11 +5,7 @@ go_library( srcs = ["sql_commenter.go"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlcommenter", visibility = ["//visibility:public"], - deps = [ - "//pkg/settings", - "//pkg/sql/parser", - "@com_github_cockroachdb_redact//:redact", - ], + deps = ["@com_github_cockroachdb_redact//:redact"], ) go_test( diff --git a/pkg/sql/sqlcommenter/sql_commenter.go b/pkg/sql/sqlcommenter/sql_commenter.go index 6b134524f9e7..ced0ff52ce94 100644 --- a/pkg/sql/sqlcommenter/sql_commenter.go +++ b/pkg/sql/sqlcommenter/sql_commenter.go @@ -9,20 +9,9 @@ import ( "net/url" "strings" - "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/redact" ) -var sqlCommenterEnabled = settings.RegisterBoolSetting( - settings.ApplicationLevel, - "sql.sqlcommenter.enabled", - "enables support for sqlcommenter. Key value parsed from sqlcommenter "+ - "comments will be included in sql insights and sql logs. "+ - "See https://google.github.io/sqlcommenter/ for more details.", - false, - settings.WithPublic) - type QueryTag struct { Key string Value redact.SafeString @@ -74,13 +63,6 @@ func ExtractQueryTags(comment string) []QueryTag { return tags } -func MaybeRetainComments(sv *settings.Values) parser.ParseOptions { - if sqlCommenterEnabled.Get(sv) { - return parser.DefaultParseOptions.RetainComments() - } - return parser.DefaultParseOptions -} - func normalize(s string) string { s = strings.Replace(s, "\\'", "'", -1) s, _ = url.QueryUnescape(s) diff --git a/pkg/sql/sqlstats/BUILD.bazel b/pkg/sql/sqlstats/BUILD.bazel index ef5ee682c3aa..d9dd66596943 100644 --- a/pkg/sql/sqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/BUILD.bazel @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "sqlstats", srcs = [ - "aggregate.go", "cluster_settings.go", "ssprovider.go", "test_utils.go", @@ -11,23 +10,17 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats", visibility = ["//visibility:public"], deps = [ - "//pkg/base", - "//pkg/obs/eventagg", - "//pkg/obs/logstream", "//pkg/roachpb", "//pkg/settings", "//pkg/sql/appstatspb", "//pkg/sql/clusterunique", - "//pkg/sql/execstats", - "//pkg/sql/sem/tree", + "//pkg/sql/execstats/execstatstypes", "//pkg/sql/sqlcommenter", - "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", + "//pkg/util/buildutil", "//pkg/util/log", - "//pkg/util/metric", "//pkg/util/stop", - "//pkg/util/timeutil", + "//pkg/util/uint128", "//pkg/util/uuid", - "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_redact//:redact", + "@com_github_cockroachdb_crlib//crtime", ], ) diff --git a/pkg/sql/sqlstats/aggregate.go b/pkg/sql/sqlstats/aggregate.go deleted file mode 100644 index 224b707dbe51..000000000000 --- a/pkg/sql/sqlstats/aggregate.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2024 The Cockroach Authors. -// -// Use of this software is governed by the CockroachDB Software License -// included in the /LICENSE file. - -package sqlstats - -import ( - "context" - "encoding/hex" - "fmt" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/obs/eventagg" - "github.com/cockroachdb/cockroach/pkg/obs/logstream" - "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" - "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" -) - -type Stmt struct { - // StmtFingerprintID is the fingerprint ID for the statement. - // TODO(abarganier): StmtFingerprintID is included in the GroupingKey for this type. - // Ideally, we would have some sort of struct tag that would allow us to leverage code - // generation to derive the GroupingKey implementation. However, for now, we punt this - // problem to avoid slowing down the prototyping process. - StmtFingerprintID appstatspb.StmtFingerprintID - // Statement is the redactable statement string. - Statement redact.RedactableString - // ServiceLatency is the latency of serving the query, excluding miscellaneous - // sources of the overhead (e.g. internal retries). - ServiceLatency time.Duration -} - -type StmtStatistics struct { - // ExecCount is the count of total executions for the statement. - // TODO(abarganier): Ideally, we find a code generation mechanism here to define - // how an event is merged into the aggregate type via things like struct tags. However, - // for now, we punt this problem to avoid slowing down the prototyping process. - ExecCount int - // ServiceLatency is a histogram used to aggregate service latencies of the - // various Stmt's recorded into this StmtStatistics instance. - ServiceLatency metric.IHistogram - ServiceLatencyP99 float64 -} - -func mergeStmt(s *Stmt, stats *StmtStatistics) { - stats.ExecCount++ - stats.ServiceLatency.RecordValue(s.ServiceLatency.Nanoseconds()) -} - -func mapStmt(s *Stmt) appstatspb.StmtFingerprintID { - return s.StmtFingerprintID -} - -// NewStmtStatsAggregator leverages the generic MapReduceAggregator to instantiate -// and return a new aggregator for StmtStatistic's.. -// -// It is an example of a system that'd use the eventagg library to easily define aggregation -// rules/criteria, and how to process those results. -func NewStmtStatsAggregator( - stopper *stop.Stopper, -) *eventagg.MapReduceAggregator[*Stmt, appstatspb.StmtFingerprintID, *StmtStatistics] { - return eventagg.NewMapReduceAggregator[*Stmt, appstatspb.StmtFingerprintID, *StmtStatistics]( - stopper, - func() *StmtStatistics { - return &StmtStatistics{ - ServiceLatency: metric.NewHistogram(metric.HistogramOptions{ - Metadata: metric.Metadata{ - Name: "stmt.svc.latency", - Measurement: "Aggregate service latency of statement executions", - Unit: metric.Unit_NANOSECONDS, - }, - Duration: base.DefaultHistogramWindowInterval(), - BucketConfig: metric.IOLatencyBuckets, - }), - } - }, - mapStmt, - mergeStmt, - eventagg.NewWindowedFlush(10*time.Second, timeutil.Now), // Let's limit our aggregation windows to clock-aligned 10-second intervals. - eventagg.NewLogWriteConsumer[appstatspb.StmtFingerprintID, *StmtStatistics](log.STATEMENT_STATS), // We'd like to log all the aggregated results, as-is. - ) -} - -// SQLStatsLogProcessor is an example logstream.Processor implementation. -type SQLStatsLogProcessor struct { - // Processors can be stateful! This is why we opt for using interface implementations as - // opposed to anonymous functions. -} - -// Process implements the logstream.Processor interface. -func (S *SQLStatsLogProcessor) Process(ctx context.Context, event any) error { - e, ok := event.(eventagg.KeyValueLog[appstatspb.StmtFingerprintID, *StmtStatistics]) - if !ok { - panic(errors.AssertionFailedf("Unexpected event type provided to SQLStatsLogProcessor: %v", event)) - } - fmt.Printf("FingerprintID: '%s'\tExec Count: %d\tP99 Latency (ms): %f\tStart: %s End %s\n", - hex.EncodeToString(sqlstatsutil.EncodeUint64ToBytes(uint64(e.Key))), - e.Value.ExecCount, - e.Value.ServiceLatencyP99/1000000, - timeutil.Unix(0, e.AggInfo.StartTime).UTC().String(), - timeutil.Unix(0, e.AggInfo.EndTime).UTC().String()) - return nil -} - -var _ logstream.Processor = (*SQLStatsLogProcessor)(nil) - -// InitStmtStatsProcessor initializes & registers a new logstream.Processor for the processing of statement statistics -// for the tenant associated with the given ctx. -// -// It consumes streams of events flushed & logged from NewStmtStatsAggregator instances belonging to the same tenant. -func InitStmtStatsProcessor(ctx context.Context, stopper *stop.Stopper) { - logstream.RegisterProcessor(ctx, stopper, log.STATEMENT_STATS, &SQLStatsLogProcessor{}) -} diff --git a/pkg/sql/sqlstats/insights/BUILD.bazel b/pkg/sql/sqlstats/insights/BUILD.bazel index 09921135245d..60ef9bd402d9 100644 --- a/pkg/sql/sqlstats/insights/BUILD.bazel +++ b/pkg/sql/sqlstats/insights/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//pkg/util/metric", "//pkg/util/quantile", "//pkg/util/syncutil", + "//pkg/util/uint128", "//pkg/util/uuid", "@com_github_cockroachdb_redact//:redact", "@com_github_prometheus_client_model//go", @@ -49,7 +50,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/appstatspb", "//pkg/sql/clusterunique", - "//pkg/sql/execstats", + "//pkg/sql/execstats/execstatstypes", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/sqlstats", diff --git a/pkg/sql/sqlstats/insights/detector.go b/pkg/sql/sqlstats/insights/detector.go index 6b8179aa9165..7d50ee297828 100644 --- a/pkg/sql/sqlstats/insights/detector.go +++ b/pkg/sql/sqlstats/insights/detector.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/util/quantile" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uint128" ) type detector interface { @@ -172,7 +173,7 @@ var prefixesToIgnore = []string{"SET ", "EXPLAIN "} // shouldIgnoreStatement returns true if we don't want to analyze the statement. func shouldIgnoreStatement(s *sqlstats.RecordedStmtStats) bool { for _, start := range prefixesToIgnore { - if strings.HasPrefix(s.Query, start) { + if strings.HasPrefix(s.Query, start) || s.StatementID.Equal(uint128.Uint128{}) { return true } } diff --git a/pkg/sql/sqlstats/insights/registry_test.go b/pkg/sql/sqlstats/insights/registry_test.go index d018e0221ff3..7879ce188ef6 100644 --- a/pkg/sql/sqlstats/insights/registry_test.go +++ b/pkg/sql/sqlstats/insights/registry_test.go @@ -15,7 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" @@ -356,7 +356,7 @@ func TestRegistry(t *testing.T) { Committed: true, SessionID: session.ID, TransactionID: uuid.MakeV4(), - ExecStats: execstats.QueryLevelStats{ + ExecStats: execstatstypes.QueryLevelStats{ ContentionTime: contentionDuration, }, } diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_ingester_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_ingester_test.go index ad01f1b697d8..31f33f15bd0a 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_ingester_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_ingester_test.go @@ -412,7 +412,6 @@ func TestStatsCollectorIngester(t *testing.T) { ingester, phaseTimes, uniqueServerCounts, - nil, // knobs ) sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go b/pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go index 21dda9f37a08..cd8fb8f500de 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go @@ -265,6 +265,25 @@ func (i *SQLStatsIngester) ingest(ctx context.Context, events *eventBuffer) { } } +func (i *SQLStatsIngester) RecordStatement(statement *sqlstats.RecordedStmtStats) { + i.BufferStatement(statement) + if i.testingKnobs != nil && i.testingKnobs.SynchronousSQLStats { + // Flush buffer and wait for the stats ingester to finish writing. + i.guard.ForceSync() + <-i.syncStatsTestingCh + } +} + +func (i *SQLStatsIngester) RecordTransaction(transaction *sqlstats.RecordedTxnStats) { + i.BufferTransaction(transaction) + + if i.testingKnobs != nil && i.testingKnobs.SynchronousSQLStats { + // Flush buffer and wait for the stats ingester to finish writing. + i.guard.ForceSync() + <-i.syncStatsTestingCh + } +} + func (i *SQLStatsIngester) BufferStatement(statement *sqlstats.RecordedStmtStats) { if i.testingKnobs != nil && i.testingKnobs.IngesterStmtInterceptor != nil { i.testingKnobs.IngesterStmtInterceptor(statement.SessionID, statement) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index 1122f50f7fa1..a960969a2b21 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -471,7 +471,6 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) { ingester, sessionphase.NewTimes(), sqlStats.GetCounters(), - nil, /* knobs */ ) recordStats := func(testCase *tc) { @@ -595,7 +594,6 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) { ingester, sessionphase.NewTimes(), sqlStats.GetCounters(), - nil, /* knobs */ ) ingester.Start(ctx, stopper) diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index a60082bda772..0d9026a82b0b 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -7,9 +7,9 @@ package sslocal import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" @@ -35,12 +35,6 @@ import ( // persist statement and transaction insights to an in-memory cache. // Events are sent to the insights subsystem for async processing. type StatsCollector struct { - // stmtFingerprintID is the fingerprint ID of the current statement we are - // recording. Note that we don't observe sql stats for all statements (e.g. COMMIT). - // If no stats have been attempted to be recorded yet for the current statement, - // this value will be 0. - stmtFingerprintID appstatspb.StmtFingerprintID - // phaseTimes tracks session-level phase times. phaseTimes sessionphase.Times @@ -67,8 +61,7 @@ type StatsCollector struct { statsIngester *SQLStatsIngester - st *cluster.Settings - knobs *sqlstats.TestingKnobs + st *cluster.Settings } // NewStatsCollector returns an instance of StatsCollector. @@ -78,7 +71,6 @@ func NewStatsCollector( ingester *SQLStatsIngester, phaseTime *sessionphase.Times, uniqueServerCounts *ssmemstorage.SQLStatsAtomicCounters, - knobs *sqlstats.TestingKnobs, ) *StatsCollector { s := &StatsCollector{ flushTarget: appStats, @@ -86,7 +78,6 @@ func NewStatsCollector( uniqueServerCounts: uniqueServerCounts, statsIngester: ingester, st: st, - knobs: knobs, } s.sendStats = s.enabled() @@ -94,11 +85,6 @@ func NewStatsCollector( return s } -// StatementFingerprintID returns the fingerprint ID for the current statement. -func (s *StatsCollector) StatementFingerprintID() appstatspb.StmtFingerprintID { - return s.stmtFingerprintID -} - // PhaseTimes returns the sessionphase.Times that this StatsCollector is // currently tracking. func (s *StatsCollector) PhaseTimes() *sessionphase.Times { @@ -118,7 +104,6 @@ func (s *StatsCollector) PreviousPhaseTimes() *sessionphase.Times { // Found a bug again? Consider refactoring. func (s *StatsCollector) Reset(appStats *ssmemstorage.Container, phaseTime *sessionphase.Times) { s.flushTarget = appStats - s.stmtFingerprintID = 0 s.previousPhaseTimes = s.phaseTimes s.phaseTimes = *phaseTime } @@ -166,14 +151,8 @@ func (s *StatsCollector) RecordStatement(_ctx context.Context, value *sqlstats.R if !s.sendStats { return } - s.stmtFingerprintID = value.FingerprintID - s.statsIngester.BufferStatement(value) - if s.knobs != nil && s.knobs.SynchronousSQLStats { - // Flush buffer and wait for the stats ingester to finish writing. - s.statsIngester.guard.ForceSync() - <-s.statsIngester.syncStatsTestingCh - } + s.statsIngester.RecordStatement(value) } // RecordTransaction sends the transaction statistics to the stats ingester. @@ -182,13 +161,7 @@ func (s *StatsCollector) RecordTransaction(_ctx context.Context, value *sqlstats return } - s.statsIngester.BufferTransaction(value) - - if s.knobs != nil && s.knobs.SynchronousSQLStats { - // Flush buffer and wait for the stats ingester to finish writing. - s.statsIngester.guard.ForceSync() - <-s.statsIngester.syncStatsTestingCh - } + s.statsIngester.RecordTransaction(value) } func (s *StatsCollector) EnabledForTransaction() bool { @@ -205,3 +178,41 @@ func (s *StatsCollector) EnabledForTransaction() bool { func (s *StatsCollector) CurrentApplicationName() string { return s.flushTarget.ApplicationName() } + +func (s *StatsCollector) RunLatency() time.Duration { + return s.PhaseTimes().GetRunLatency() +} + +func (s *StatsCollector) IdleLatency() time.Duration { + return s.PhaseTimes().GetIdleLatency(s.PreviousPhaseTimes()) +} + +func (s *StatsCollector) ServiceLatency() time.Duration { + return s.PhaseTimes().GetServiceLatencyNoOverhead() +} + +func (s *StatsCollector) ParsingLatency() time.Duration { + return s.PhaseTimes().GetParsingLatency() +} + +func (s *StatsCollector) PlanningLatency() time.Duration { + return s.PhaseTimes().GetPlanningLatency() +} + +func (s *StatsCollector) ProcessingLatency() time.Duration { + return s.ParsingLatency() + s.PlanningLatency() + s.RunLatency() +} + +func (s *StatsCollector) ExecOverheadLatency() time.Duration { + return s.ServiceLatency() - s.ProcessingLatency() +} + +func (s *StatsCollector) StartTime() time.Time { + return s.PhaseTimes().GetSessionPhaseTime(sessionphase.PlannerStartExecStmt).ToUTC() +} + +func (s *StatsCollector) EndTime() time.Time { + return s.StartTime().Add(s.ServiceLatency()) +} + +var _ sqlstats.StatementLatencyRecorder = &StatsCollector{} diff --git a/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel b/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel index 6d91b139bcf1..15e108b42d02 100644 --- a/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel +++ b/pkg/sql/sqlstats/ssmemstorage/BUILD.bazel @@ -16,7 +16,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/appstatspb", - "//pkg/sql/execstats", + "//pkg/sql/execstats/execstatstypes", "//pkg/sql/pgwire/pgerror", "//pkg/sql/sqlstats", "//pkg/util", diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index d4e5bc401a03..7d29d3af8176 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -373,7 +373,7 @@ func (s *stmtStats) sizeUnsafeLocked() int64 { return stmtStatsShallowSize + metaFieldsSize + dataSize } -func (s *stmtStats) recordExecStatsLocked(stats execstats.QueryLevelStats) { +func (s *stmtStats) recordExecStatsLocked(stats execstatstypes.QueryLevelStats) { s.mu.data.ExecStats.Count++ count := s.mu.data.ExecStats.Count s.mu.data.ExecStats.NetworkBytes.Record(count, float64(stats.DistSQLNetworkBytesSent)) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 7c4cc2ed69dc..68f11d0b76d6 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -86,7 +86,7 @@ func (s *Container) RecordStatement(ctx context.Context, value *sqlstats.Recorde stats.mu.data.MaxRetries = int64(value.AutoRetryCount) } - stats.mu.data.SQLType = value.StatementType.String() + stats.mu.data.SQLType = value.StatementType stats.mu.data.NumRows.Record(stats.mu.data.Count, float64(value.RowsAffected)) stats.mu.data.IdleLat.Record(stats.mu.data.Count, value.IdleLatencySec) stats.mu.data.ParseLat.Record(stats.mu.data.Count, value.ParseLatencySec) diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 84d76115f062..1157e4bf06cd 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -15,10 +15,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/sqlcommenter" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/crlib/crtime" ) // IteratorOptions provides the ability to the caller to change how it iterates @@ -84,14 +87,14 @@ type RecordedStmtStats struct { RowsWritten int64 Nodes []int64 KVNodeIDs []int32 - StatementType tree.StatementType + StatementType string Plan *appstatspb.ExplainTreePlanNode PlanGist string StatementError error IndexRecommendations []string StartTime time.Time EndTime time.Time - ExecStats *execstats.QueryLevelStats + ExecStats *execstatstypes.QueryLevelStats Indexes []string QueryTags []sqlcommenter.QueryTag UnderOuterTxn bool @@ -116,7 +119,7 @@ type RecordedTxnStats struct { IdleLatency time.Duration RowsAffected int CollectedExecStats bool - ExecStats execstats.QueryLevelStats + ExecStats execstatstypes.QueryLevelStats RowsRead int64 RowsWritten int64 BytesRead int64 @@ -143,3 +146,308 @@ type SSDrainer interface { // be lost. Reset(ctx context.Context) error } + +type StatementLatencyRecorder interface { + RunLatency() time.Duration + IdleLatency() time.Duration + ServiceLatency() time.Duration + ParsingLatency() time.Duration + PlanningLatency() time.Duration + ExecOverheadLatency() time.Duration + StartTime() time.Time + EndTime() time.Time +} + +type RecordedStatementStatsBuilder[L StatementLatencyRecorder] struct { + stmtStats *RecordedStmtStats + latencyRecorder L + recorderSet bool +} + +func NewRecordedStatementStatsBuilder[L StatementLatencyRecorder]( + fingerprintId appstatspb.StmtFingerprintID, + database string, + fingerprint string, + summary string, + stmtType string, + appName string, +) *RecordedStatementStatsBuilder[L] { + return &RecordedStatementStatsBuilder[L]{ + stmtStats: &RecordedStmtStats{ + FingerprintID: fingerprintId, + QuerySummary: summary, + StatementType: stmtType, + Query: fingerprint, + Database: database, + App: appName, + }, + } +} + +func (b *RecordedStatementStatsBuilder[L]) WithLatencyRecorder( + recorder L, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.recorderSet = true + b.latencyRecorder = recorder + return b +} + +func (b *RecordedStatementStatsBuilder[L]) GetLatencyRecorder() L { + return b.latencyRecorder +} + +func (b *RecordedStatementStatsBuilder[L]) PlanMetadata( + generic bool, distSQL bool, vectorized bool, implicitTxn bool, fullScan bool, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.Generic = generic + b.stmtStats.DistSQL = distSQL + b.stmtStats.Vec = vectorized + b.stmtStats.ImplicitTxn = implicitTxn + b.stmtStats.FullScan = fullScan + return b +} + +func (b *RecordedStatementStatsBuilder[L]) PlanGist( + gist string, hash uint64, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.PlanGist = gist + b.stmtStats.PlanHash = hash + return b +} + +func (b *RecordedStatementStatsBuilder[L]) LatencyRecorder( + recorder L, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.RunLatencySec = recorder.RunLatency().Seconds() + b.stmtStats.IdleLatencySec = recorder.IdleLatency().Seconds() + b.stmtStats.ServiceLatencySec = recorder.ServiceLatency().Seconds() + b.stmtStats.ParseLatencySec = recorder.ParsingLatency().Seconds() + b.stmtStats.PlanLatencySec = recorder.PlanningLatency().Seconds() + b.stmtStats.OverheadLatencySec = recorder.ExecOverheadLatency().Seconds() + b.stmtStats.StartTime = recorder.StartTime() + b.stmtStats.EndTime = recorder.EndTime() + return b +} + +func (b *RecordedStatementStatsBuilder[L]) QueryLevelStats( + bytesRead int64, rowsRead int64, rowsWritten int64, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.BytesRead = bytesRead + b.stmtStats.RowsRead = rowsRead + b.stmtStats.RowsWritten = rowsWritten + return b +} + +func (b *RecordedStatementStatsBuilder[L]) ExecStats( + execStats *execstatstypes.QueryLevelStats, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + if execStats == nil { + return b + } + + var sqlInstanceIDs []int64 + var kvNodeIDs []int32 + sqlInstanceIDs = make([]int64, 0, len(execStats.SQLInstanceIDs)) + for _, sqlInstanceID := range execStats.SQLInstanceIDs { + sqlInstanceIDs = append(sqlInstanceIDs, int64(sqlInstanceID)) + } + kvNodeIDs = execStats.KVNodeIDs + b.stmtStats.KVNodeIDs = kvNodeIDs + b.stmtStats.Nodes = sqlInstanceIDs + b.stmtStats.ExecStats = execStats + return b +} + +func (b *RecordedStatementStatsBuilder[L]) Indexes( + indexes []string, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.Indexes = indexes + return b +} + +func (b *RecordedStatementStatsBuilder[L]) StatementError( + stmtErr error, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + if stmtErr == nil { + return b + } + b.stmtStats.StatementError = stmtErr + b.stmtStats.Failed = true + return b +} + +func (b *RecordedStatementStatsBuilder[L]) AutoRetry( + autoRetryCount int, autoRetryReason error, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.AutoRetryCount = autoRetryCount + b.stmtStats.AutoRetryReason = autoRetryReason + return b +} + +func (b *RecordedStatementStatsBuilder[L]) RowsAffected( + rowsAffected int, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.RowsAffected = rowsAffected + return b +} + +func (b *RecordedStatementStatsBuilder[L]) IndexRecommendations( + idxRecommendations []string, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.IndexRecommendations = idxRecommendations + return b +} + +func (b *RecordedStatementStatsBuilder[L]) UnderOuterTxn() *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.UnderOuterTxn = true + return b +} + +func (b *RecordedStatementStatsBuilder[L]) QueryTags( + queryTags []sqlcommenter.QueryTag, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.QueryTags = queryTags + return b +} + +func (b *RecordedStatementStatsBuilder[L]) QueryID( + queryID clusterunique.ID, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.StatementID = queryID + return b +} + +func (b *RecordedStatementStatsBuilder[L]) SessionID( + sessionID clusterunique.ID, +) *RecordedStatementStatsBuilder[L] { + if b == nil { + return b + } + b.stmtStats.SessionID = sessionID + return b +} + +func (b *RecordedStatementStatsBuilder[L]) Build() *RecordedStmtStats { + // Validate that Session id has been set + if b == nil { + return nil + } + + if b.recorderSet { + b.LatencyRecorder(b.latencyRecorder) + } + + if b.stmtStats.SessionID.Equal(uint128.Uint128{}) { + if buildutil.CrdbTestBuild { + panic("SessionID must be set before building RecordedStmtStats") + } else { + // Log with depth 1 so we can see where this is being called with no SessionID. + log.Dev.ErrorfDepth(context.Background(), 1, "SessionID must be set before building RecordedStmtStats") + } + } + return b.stmtStats +} + +type RoutineStatementPhase int + +const ( + StatementStarted RoutineStatementPhase = iota + StatementStartParsing + StatementEndParsing + StatementStartPlanning + StatementEndPlanning + StatementStartExec + StatementEndExec + StatementEnd + StatementNumPhases +) + +type LatencyRecorder struct { + times [StatementNumPhases]crtime.Mono +} + +var _ StatementLatencyRecorder = &LatencyRecorder{} + +func NewStatementLatencyRecorder() *LatencyRecorder { + return &LatencyRecorder{} +} + +func (r *LatencyRecorder) RecordPhase(phase RoutineStatementPhase, time crtime.Mono) { + r.times[phase] = time +} + +func (r *LatencyRecorder) RunLatency() time.Duration { + return r.times[StatementEndExec].Sub(r.times[StatementStartExec]) +} + +func (r *LatencyRecorder) IdleLatency() time.Duration { + return 0 +} + +func (r *LatencyRecorder) ServiceLatency() time.Duration { + return r.times[StatementEndExec].Sub(r.times[StatementStarted]) +} +func (r *LatencyRecorder) ParsingLatency() time.Duration { + return r.times[StatementEndParsing].Sub(r.times[StatementStartParsing]) +} +func (r *LatencyRecorder) PlanningLatency() time.Duration { + return r.times[StatementEndPlanning].Sub(r.times[StatementStartPlanning]) +} + +func (r *LatencyRecorder) ProcessingLatency() time.Duration { + return r.ParsingLatency() + r.PlanningLatency() + r.RunLatency() +} + +func (r *LatencyRecorder) ExecOverheadLatency() time.Duration { + return r.ServiceLatency() - r.ProcessingLatency() +} + +func (r *LatencyRecorder) StartTime() time.Time { + return r.times[StatementStarted].ToUTC() +} + +func (r *LatencyRecorder) EndTime() time.Time { + return r.times[StatementEnd].ToUTC() +} diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index c30fea8c87ea..f2b5a966ca3a 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -13,7 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -178,7 +178,7 @@ type TelemetryLoggingTestingKnobs struct { // when updating rolling query counts. getTimeNow func() time.Time // getQueryLevelMetrics allows tests to override the recorded query level stats. - getQueryLevelStats func() execstats.QueryLevelStats + getQueryLevelStats func() execstatstypes.QueryLevelStats // getTracingStatus allows tests to override whether the current query has tracing // enabled or not. Queries with tracing enabled are always sampled to telemetry. getTracingStatus func() bool @@ -186,7 +186,7 @@ type TelemetryLoggingTestingKnobs struct { func NewTelemetryLoggingTestingKnobs( getTimeNowFunc func() time.Time, - getQueryLevelStatsFunc func() execstats.QueryLevelStats, + getQueryLevelStatsFunc func() execstatstypes.QueryLevelStats, getTracingStatusFunc func() bool, ) *TelemetryLoggingTestingKnobs { return &TelemetryLoggingTestingKnobs{ @@ -358,8 +358,8 @@ func (t *telemetryLoggingMetrics) shouldEmitStatementLog( } func (t *telemetryLoggingMetrics) getQueryLevelStats( - queryLevelStats execstats.QueryLevelStats, -) execstats.QueryLevelStats { + queryLevelStats execstatstypes.QueryLevelStats, +) execstatstypes.QueryLevelStats { if t.Knobs != nil && t.Knobs.getQueryLevelStats != nil { return t.Knobs.getQueryLevelStats() } diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index c3ecb643133d..6efa2d7d5372 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -125,7 +125,7 @@ func TestTelemetryLogging(t *testing.T) { expectedWrite bool expectedIndexes bool expectedErr string // Empty string means no error is expected. - queryLevelStats execstats.QueryLevelStats + queryLevelStats execstatstypes.QueryLevelStats enableTracing bool enableInjectTxErrors bool expectedStatsCollector *sslocal.StatsCollector @@ -149,7 +149,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: false, expectedWrite: false, expectedIndexes: false, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 0 * time.Nanosecond, DistSQLNetworkBytesSent: 1, MaxMemUsage: 2, @@ -194,7 +194,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: false, expectedWrite: false, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 1 * time.Nanosecond, }, enableTracing: false, @@ -217,7 +217,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: true, expectedWrite: false, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 2 * time.Nanosecond, DistSQLNetworkBytesSent: 1, MaxMemUsage: 2, @@ -242,7 +242,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: true, expectedWrite: false, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 3 * time.Nanosecond, DistSQLNetworkBytesSent: 1124, MaxMemUsage: 132, @@ -270,7 +270,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: true, expectedWrite: false, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 0 * time.Nanosecond, DistSQLNetworkBytesSent: 124235, MaxMemUsage: 12412, @@ -297,7 +297,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: true, expectedWrite: true, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 0 * time.Nanosecond, DistSQLNetworkBytesSent: 1, KVBytesRead: 4, @@ -343,7 +343,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: true, expectedWrite: false, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 2 * time.Nanosecond, DistSQLNetworkBytesSent: 10, MaxMemUsage: 20, @@ -389,7 +389,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead: true, expectedWrite: false, expectedIndexes: true, - queryLevelStats: execstats.QueryLevelStats{ + queryLevelStats: execstatstypes.QueryLevelStats{ ContentionTime: 9223372036854775807 * time.Nanosecond, DistSQLNetworkBytesSent: 9223372036854775807, MaxMemUsage: 9223372036854775807, diff --git a/pkg/util/log/logtestutils/BUILD.bazel b/pkg/util/log/logtestutils/BUILD.bazel index 6dbbda181c96..b3b291f04328 100644 --- a/pkg/util/log/logtestutils/BUILD.bazel +++ b/pkg/util/log/logtestutils/BUILD.bazel @@ -11,7 +11,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils", visibility = ["//visibility:public"], deps = [ - "//pkg/sql/execstats", + "//pkg/sql/execstats/execstatstypes", "//pkg/util/log", "//pkg/util/log/logconfig", "//pkg/util/log/logpb", diff --git a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go index 3bceb25ee4a6..10f6156d8563 100644 --- a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go +++ b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go @@ -8,7 +8,7 @@ package logtestutils import ( "time" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -35,18 +35,18 @@ func (s *StubTime) TimeNow() time.Time { // StubQueryStats is a helper struct to stub query level stats. type StubQueryStats struct { syncutil.RWMutex - stats execstats.QueryLevelStats + stats execstatstypes.QueryLevelStats } // SetQueryLevelStats sets the stubbed query level stats. -func (s *StubQueryStats) SetQueryLevelStats(stats execstats.QueryLevelStats) { +func (s *StubQueryStats) SetQueryLevelStats(stats execstatstypes.QueryLevelStats) { s.RWMutex.Lock() defer s.RWMutex.Unlock() s.stats = stats } // QueryLevelStats returns the current stubbed query level stats. -func (s *StubQueryStats) QueryLevelStats() execstats.QueryLevelStats { +func (s *StubQueryStats) QueryLevelStats() execstatstypes.QueryLevelStats { s.RWMutex.RLock() defer s.RWMutex.RUnlock() return s.stats