Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,7 @@ GO_TARGETS = [
"//pkg/sql/execinfra:execinfra_test",
"//pkg/sql/execinfrapb:execinfrapb",
"//pkg/sql/execinfrapb:execinfrapb_test",
"//pkg/sql/execstats/execstatstypes:execstatstypes",
"//pkg/sql/execstats:execstats",
"//pkg/sql/execstats:execstats_test",
"//pkg/sql/execversion:execversion",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ go_library(
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/execstats/execstatstypes",
"//pkg/sql/execversion",
"//pkg/sql/exprutil",
"//pkg/sql/faketreeeval",
Expand Down Expand Up @@ -865,7 +866,7 @@ go_test(
"//pkg/sql/distsql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/execstats/execstatstypes",
"//pkg/sql/flowinfra",
"//pkg/sql/gcjob",
"//pkg/sql/isql",
Expand Down
9 changes: 3 additions & 6 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes"
"github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
Expand Down Expand Up @@ -1260,15 +1260,12 @@ func (s *Server) newConnExecutor(

ex.applicationName.Store(ex.sessionData().ApplicationName)
ex.applicationStats = applicationStats
// We ignore statements and transactions run by the internal executor by
// passing a nil writer.
ex.statsCollector = sslocal.NewStatsCollector(
s.cfg.Settings,
applicationStats,
s.sqlStatsIngester,
ex.phaseTimes,
s.localSqlStats.GetCounters(),
s.cfg.SQLStatsTestingKnobs,
)
ex.dataMutatorIterator.OnApplicationNameChange = func(newName string) {
ex.applicationName.Store(newName)
Expand Down Expand Up @@ -1703,7 +1700,7 @@ type connExecutor struct {
// this transaction should collect execution stats.
shouldCollectTxnExecutionStats bool
// accumulatedStats are the accumulated stats of all statements.
accumulatedStats execstats.QueryLevelStats
accumulatedStats execstatstypes.QueryLevelStats

// idleLatency is the cumulative amount of time spent waiting for the
// client to send statements while holding the transaction open.
Expand Down Expand Up @@ -3962,7 +3959,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
p.cancelChecker.Reset(ctx)

ex.initEvalCtx(ctx, &p.extendedEvalCtx, p)

p.statsCollector = ex.statsCollector
p.sessionDataMutatorIterator = ex.dataMutatorIterator
p.noticeSender = nil
p.preparedStatements = ex.getPrepStmtsAccessor()
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes"
"github.com/cockroachdb/cockroach/pkg/sql/hints"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
Expand Down Expand Up @@ -4187,7 +4188,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
ex.extraTxnState.numRows = 0
// accumulatedStats are cleared, but shouldCollectTxnExecutionStats is
// unchanged.
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.accumulatedStats = execstatstypes.QueryLevelStats{}
ex.extraTxnState.idleLatency = 0
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
Expand Down Expand Up @@ -4220,7 +4221,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
ex.extraTxnState.transactionStatementsHash = util.MakeFNV64()
ex.extraTxnState.transactionStatementFingerprintIDs = nil
ex.extraTxnState.numRows = 0
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.accumulatedStats = execstatstypes.QueryLevelStats{}
ex.extraTxnState.idleLatency = 0
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -314,7 +314,7 @@ func (p *planner) maybeLogStatementInternal(
return
}

var queryLevelStats execstats.QueryLevelStats
var queryLevelStats execstatstypes.QueryLevelStats
if stats, ok := p.instrumentation.GetQueryLevelStats(); ok {
queryLevelStats = *stats
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/base",
"//pkg/kv/kvpb",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats/execstatstypes",
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/optional",
Expand Down Expand Up @@ -46,6 +47,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats/execstatstypes",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/testutils/serverutils",
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/execstats/execstatstypes/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "execstatstypes",
srcs = ["types.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/util",
],
)
101 changes: 101 additions & 0 deletions pkg/sql/execstats/execstatstypes/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package execstatstypes

import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util"
)

// QueryLevelStats returns all the query level stats that correspond to the
// given traces and flow metadata.
// NOTE: When adding fields to this struct, be sure to update Accumulate.
type QueryLevelStats struct {
DistSQLNetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVPairsRead int64
KVRowsRead int64
KVBatchRequestsIssued int64
KVTime time.Duration
MvccSteps int64
MvccStepsInternal int64
MvccSeeks int64
MvccSeeksInternal int64
MvccBlockBytes int64
MvccBlockBytesInCache int64
MvccKeyBytes int64
MvccValueBytes int64
MvccPointCount int64
MvccPointsCoveredByRangeTombstones int64
MvccRangeKeyCount int64
MvccRangeKeyContainedPoints int64
MvccRangeKeySkippedPoints int64
DistSQLNetworkMessages int64
ContentionTime time.Duration
LockWaitTime time.Duration
LatchWaitTime time.Duration
ContentionEvents []kvpb.ContentionEvent
RUEstimate float64
CPUTime time.Duration
// SQLInstanceIDs is an ordered list of SQL instances that were involved in
// query processing.
SQLInstanceIDs []int32
// KVNodeIDs is an ordered list of KV Node IDs that were used for KV reads
// while processing the query.
KVNodeIDs []int32
// Regions is an ordered list of regions in which both SQL and KV nodes
// involved in query processing reside.
Regions []string
// UsedFollowerRead indicates whether at least some reads were served by the
// follower replicas.
UsedFollowerRead bool
ClientTime time.Duration
}

// Accumulate accumulates other's stats into the receiver.
func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.DistSQLNetworkBytesSent += other.DistSQLNetworkBytesSent
if other.MaxMemUsage > s.MaxMemUsage {
s.MaxMemUsage = other.MaxMemUsage
}
if other.MaxDiskUsage > s.MaxDiskUsage {
s.MaxDiskUsage = other.MaxDiskUsage
}
s.KVBytesRead += other.KVBytesRead
s.KVPairsRead += other.KVPairsRead
s.KVRowsRead += other.KVRowsRead
s.KVBatchRequestsIssued += other.KVBatchRequestsIssued
s.KVTime += other.KVTime
s.MvccSteps += other.MvccSteps
s.MvccStepsInternal += other.MvccStepsInternal
s.MvccSeeks += other.MvccSeeks
s.MvccSeeksInternal += other.MvccSeeksInternal
s.MvccBlockBytes += other.MvccBlockBytes
s.MvccBlockBytesInCache += other.MvccBlockBytesInCache
s.MvccKeyBytes += other.MvccKeyBytes
s.MvccValueBytes += other.MvccValueBytes
s.MvccPointCount += other.MvccPointCount
s.MvccPointsCoveredByRangeTombstones += other.MvccPointsCoveredByRangeTombstones
s.MvccRangeKeyCount += other.MvccRangeKeyCount
s.MvccRangeKeyContainedPoints += other.MvccRangeKeyContainedPoints
s.MvccRangeKeySkippedPoints += other.MvccRangeKeySkippedPoints
s.DistSQLNetworkMessages += other.DistSQLNetworkMessages
s.ContentionTime += other.ContentionTime
s.LockWaitTime += other.LockWaitTime
s.LatchWaitTime += other.LatchWaitTime
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
s.RUEstimate += other.RUEstimate
s.CPUTime += other.CPUTime
s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs)
s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs)
s.Regions = util.CombineUnique(s.Regions, other.Regions)
s.UsedFollowerRead = s.UsedFollowerRead || other.UsedFollowerRead
s.ClientTime += other.ClientTime
}
107 changes: 11 additions & 96 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats/execstatstypes"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
Expand Down Expand Up @@ -91,115 +92,29 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows
return a
}

// QueryLevelStats returns all the query level stats that correspond to the
// given traces and flow metadata.
// NOTE: When adding fields to this struct, be sure to update Accumulate.
type QueryLevelStats struct {
DistSQLNetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVPairsRead int64
KVRowsRead int64
KVBatchRequestsIssued int64
KVTime time.Duration
MvccSteps int64
MvccStepsInternal int64
MvccSeeks int64
MvccSeeksInternal int64
MvccBlockBytes int64
MvccBlockBytesInCache int64
MvccKeyBytes int64
MvccValueBytes int64
MvccPointCount int64
MvccPointsCoveredByRangeTombstones int64
MvccRangeKeyCount int64
MvccRangeKeyContainedPoints int64
MvccRangeKeySkippedPoints int64
DistSQLNetworkMessages int64
ContentionTime time.Duration
LockWaitTime time.Duration
LatchWaitTime time.Duration
ContentionEvents []kvpb.ContentionEvent
RUEstimate float64
CPUTime time.Duration
// SQLInstanceIDs is an ordered list of SQL instances that were involved in
// query processing.
SQLInstanceIDs []int32
// KVNodeIDs is an ordered list of KV Node IDs that were used for KV reads
// while processing the query.
KVNodeIDs []int32
// Regions is an ordered list of regions in which both SQL and KV nodes
// involved in query processing reside.
Regions []string
// UsedFollowerRead indicates whether at least some reads were served by the
// follower replicas.
UsedFollowerRead bool
ClientTime time.Duration
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
// if an error occurred while getting query-level stats.
type QueryLevelStatsWithErr struct {
Stats QueryLevelStats
Stats execstatstypes.QueryLevelStats
Err error
}

// MakeQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a
// QueryLevelStats and error.
func MakeQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr {
func MakeQueryLevelStatsWithErr(
stats execstatstypes.QueryLevelStats, err error,
) QueryLevelStatsWithErr {
return QueryLevelStatsWithErr{
stats,
err,
}
}

// Accumulate accumulates other's stats into the receiver.
func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.DistSQLNetworkBytesSent += other.DistSQLNetworkBytesSent
if other.MaxMemUsage > s.MaxMemUsage {
s.MaxMemUsage = other.MaxMemUsage
}
if other.MaxDiskUsage > s.MaxDiskUsage {
s.MaxDiskUsage = other.MaxDiskUsage
Stats: stats,
Err: err,
}
s.KVBytesRead += other.KVBytesRead
s.KVPairsRead += other.KVPairsRead
s.KVRowsRead += other.KVRowsRead
s.KVBatchRequestsIssued += other.KVBatchRequestsIssued
s.KVTime += other.KVTime
s.MvccSteps += other.MvccSteps
s.MvccStepsInternal += other.MvccStepsInternal
s.MvccSeeks += other.MvccSeeks
s.MvccSeeksInternal += other.MvccSeeksInternal
s.MvccBlockBytes += other.MvccBlockBytes
s.MvccBlockBytesInCache += other.MvccBlockBytesInCache
s.MvccKeyBytes += other.MvccKeyBytes
s.MvccValueBytes += other.MvccValueBytes
s.MvccPointCount += other.MvccPointCount
s.MvccPointsCoveredByRangeTombstones += other.MvccPointsCoveredByRangeTombstones
s.MvccRangeKeyCount += other.MvccRangeKeyCount
s.MvccRangeKeyContainedPoints += other.MvccRangeKeyContainedPoints
s.MvccRangeKeySkippedPoints += other.MvccRangeKeySkippedPoints
s.DistSQLNetworkMessages += other.DistSQLNetworkMessages
s.ContentionTime += other.ContentionTime
s.LockWaitTime += other.LockWaitTime
s.LatchWaitTime += other.LatchWaitTime
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
s.RUEstimate += other.RUEstimate
s.CPUTime += other.CPUTime
s.SQLInstanceIDs = util.CombineUnique(s.SQLInstanceIDs, other.SQLInstanceIDs)
s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs)
s.Regions = util.CombineUnique(s.Regions, other.Regions)
s.UsedFollowerRead = s.UsedFollowerRead || other.UsedFollowerRead
s.ClientTime += other.ClientTime
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
// flow metadata and an accompanying trace of the flows' execution.
type TraceAnalyzer struct {
*FlowsMetadata
queryLevelStats QueryLevelStats
queryLevelStats execstatstypes.QueryLevelStats
}

// NewTraceAnalyzer creates a TraceAnalyzer with the corresponding physical
Expand Down Expand Up @@ -374,7 +289,7 @@ func getNumDistSQLNetworkMessagesFromComponentsStats(v *execinfrapb.ComponentSta

// GetQueryLevelStats returns the query level stats calculated and stored in
// TraceAnalyzer.
func (a *TraceAnalyzer) GetQueryLevelStats() QueryLevelStats {
func (a *TraceAnalyzer) GetQueryLevelStats() execstatstypes.QueryLevelStats {
return a.queryLevelStats
}

Expand Down Expand Up @@ -403,8 +318,8 @@ func getAllContentionEvents(trace []tracingpb.RecordedSpan) []kvpb.ContentionEve
// errors to the caller but continues calculating other stats.
func GetQueryLevelStats(
trace []tracingpb.RecordedSpan, deterministicExplainAnalyze bool, flowsMetadata []*FlowsMetadata,
) (QueryLevelStats, error) {
var queryLevelStats QueryLevelStats
) (execstatstypes.QueryLevelStats, error) {
var queryLevelStats execstatstypes.QueryLevelStats
var errs error
for _, metadata := range flowsMetadata {
analyzer := NewTraceAnalyzer(metadata)
Expand Down
Loading
Loading