Skip to content

Commit 3bd384c

Browse files
committed
changefeedccl: add static labels to changefeed metrics
This commit adds static labels to changefeed stage and emitted metrics metrics. Resolves: #156290 Release note: None
1 parent 6d96fe6 commit 3bd384c

File tree

5 files changed

+109
-42
lines changed

5 files changed

+109
-42
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,27 @@ layers:
2323
derivative: NON_NEGATIVE_DERIVATIVE
2424
how_to_use: This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the throughput bytes being streamed from the CockroachDB cluster.
2525
essential: true
26-
- name: changefeed.emitted_messages
27-
exported_name: changefeed_emitted_messages
28-
description: Messages emitted by all feeds
26+
- name: changefeed.emitted_resolved_messages
27+
exported_name: changefeed_emitted_resolved_messages
28+
labeled_name: 'changefeed.emitted_messages{name: resolved}'
29+
description: Resolved timestamp messages emitted by all feeds
2930
y_axis_label: Messages
3031
type: COUNTER
3132
unit: COUNT
3233
aggregation: AVG
3334
derivative: NON_NEGATIVE_DERIVATIVE
34-
how_to_use: This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the rate of changes being streamed from the CockroachDB cluster.
35+
how_to_use: This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the rate of resolved timestamp messages being streamed from the CockroachDB cluster.
36+
essential: true
37+
- name: changefeed.emitted_row_messages
38+
exported_name: changefeed_emitted_row_messages
39+
labeled_name: 'changefeed.emitted_messages{name: row}'
40+
description: Row messages emitted by all feeds
41+
y_axis_label: Messages
42+
type: COUNTER
43+
unit: COUNT
44+
aggregation: AVG
45+
derivative: NON_NEGATIVE_DERIVATIVE
46+
how_to_use: This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the rate of row changes being streamed from the CockroachDB cluster.
3547
essential: true
3648
- name: changefeed.error_retries
3749
exported_name: changefeed_error_retries
@@ -1817,6 +1829,7 @@ layers:
18171829
derivative: NON_NEGATIVE_DERIVATIVE
18181830
- name: changefeed.stage.checkpoint_job_progress.latency
18191831
exported_name: changefeed_stage_checkpoint_job_progress_latency
1832+
labeled_name: 'changefeed.stage.latency{name: checkpoint_job_progress}'
18201833
description: 'Latency of the changefeed stage: checkpointing job progress'
18211834
y_axis_label: Latency
18221835
type: HISTOGRAM
@@ -1825,6 +1838,7 @@ layers:
18251838
derivative: NONE
18261839
- name: changefeed.stage.downstream_client_send.latency
18271840
exported_name: changefeed_stage_downstream_client_send_latency
1841+
labeled_name: 'changefeed.stage.latency{name: downstream_client_send}'
18281842
description: 'Latency of the changefeed stage: flushing messages from the sink''s client to its downstream. This includes sends that failed for most but not all sinks.'
18291843
y_axis_label: Latency
18301844
type: HISTOGRAM
@@ -1833,6 +1847,7 @@ layers:
18331847
derivative: NONE
18341848
- name: changefeed.stage.emit_row.latency
18351849
exported_name: changefeed_stage_emit_row_latency
1850+
labeled_name: 'changefeed.stage.latency{name: emit_row}'
18361851
description: 'Latency of the changefeed stage: emitting row to sink'
18371852
y_axis_label: Latency
18381853
type: HISTOGRAM
@@ -1841,6 +1856,7 @@ layers:
18411856
derivative: NONE
18421857
- name: changefeed.stage.encode.latency
18431858
exported_name: changefeed_stage_encode_latency
1859+
labeled_name: 'changefeed.stage.latency{name: encode}'
18441860
description: 'Latency of the changefeed stage: encoding data'
18451861
y_axis_label: Latency
18461862
type: HISTOGRAM
@@ -1849,6 +1865,7 @@ layers:
18491865
derivative: NONE
18501866
- name: changefeed.stage.frontier_persistence.latency
18511867
exported_name: changefeed_stage_frontier_persistence_latency
1868+
labeled_name: 'changefeed.stage.latency{name: frontier_persistence}'
18521869
description: 'Latency of the changefeed stage: persisting frontier to job info'
18531870
y_axis_label: Latency
18541871
type: HISTOGRAM
@@ -1857,6 +1874,7 @@ layers:
18571874
derivative: NONE
18581875
- name: changefeed.stage.kv_feed_buffer.latency
18591876
exported_name: changefeed_stage_kv_feed_buffer_latency
1877+
labeled_name: 'changefeed.stage.latency{name: kv_feed_buffer}'
18601878
description: 'Latency of the changefeed stage: waiting to buffer kv events'
18611879
y_axis_label: Latency
18621880
type: HISTOGRAM
@@ -1865,6 +1883,7 @@ layers:
18651883
derivative: NONE
18661884
- name: changefeed.stage.kv_feed_wait_for_table_event.latency
18671885
exported_name: changefeed_stage_kv_feed_wait_for_table_event_latency
1886+
labeled_name: 'changefeed.stage.latency{name: kv_feed_wait_for_table_event}'
18681887
description: 'Latency of the changefeed stage: waiting for a table schema event to join to the kv event'
18691888
y_axis_label: Latency
18701889
type: HISTOGRAM
@@ -1873,6 +1892,7 @@ layers:
18731892
derivative: NONE
18741893
- name: changefeed.stage.pts.create.latency
18751894
exported_name: changefeed_stage_pts_create_latency
1895+
labeled_name: 'changefeed.stage.pts.latency{name: create}'
18761896
description: 'Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation'
18771897
y_axis_label: Latency
18781898
type: HISTOGRAM
@@ -1881,6 +1901,7 @@ layers:
18811901
derivative: NONE
18821902
- name: changefeed.stage.pts.manage.latency
18831903
exported_name: changefeed_stage_pts_manage_latency
1904+
labeled_name: 'changefeed.stage.pts.latency{name: manage}'
18841905
description: 'Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed'
18851906
y_axis_label: Latency
18861907
type: HISTOGRAM
@@ -1889,6 +1910,7 @@ layers:
18891910
derivative: NONE
18901911
- name: changefeed.stage.pts.manage_error.latency
18911912
exported_name: changefeed_stage_pts_manage_error_latency
1913+
labeled_name: 'changefeed.stage.pts.latency{name: manage_error}'
18921914
description: 'Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error'
18931915
y_axis_label: Latency
18941916
type: HISTOGRAM
@@ -1897,6 +1919,7 @@ layers:
18971919
derivative: NONE
18981920
- name: changefeed.stage.rangefeed_buffer_checkpoint.latency
18991921
exported_name: changefeed_stage_rangefeed_buffer_checkpoint_latency
1922+
labeled_name: 'changefeed.stage.latency{name: rangefeed_buffer_checkpoint}'
19001923
description: 'Latency of the changefeed stage: buffering rangefeed checkpoint events'
19011924
y_axis_label: Latency
19021925
type: HISTOGRAM
@@ -1905,6 +1928,7 @@ layers:
19051928
derivative: NONE
19061929
- name: changefeed.stage.rangefeed_buffer_value.latency
19071930
exported_name: changefeed_stage_rangefeed_buffer_value_latency
1931+
labeled_name: 'changefeed.stage.latency{name: rangefeed_buffer_value}'
19081932
description: 'Latency of the changefeed stage: buffering rangefeed value events'
19091933
y_axis_label: Latency
19101934
type: HISTOGRAM

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6543,7 +6543,10 @@ func TestChangefeedMonitoring(t *testing.T) {
65436543
`CREATE TABLE foo (a INT PRIMARY KEY) WITH (schema_locked=%t)`, schemaLocked))
65446544
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
65456545

6546-
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_messages`); c != 0 {
6546+
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_row_messages`); c != 0 {
6547+
t.Errorf(`expected 0 got %d`, c)
6548+
}
6549+
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_resolved_messages`); c != 0 {
65476550
t.Errorf(`expected 0 got %d`, c)
65486551
}
65496552
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_bytes`); c != 0 {
@@ -6571,14 +6574,17 @@ func TestChangefeedMonitoring(t *testing.T) {
65716574
t.Errorf(`expected 0 got %d`, c)
65726575
}
65736576

6574-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'`)
6577+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0', resolved`)
65756578
_, err := foo.Next()
65766579
require.NoError(t, err)
65776580

65786581
testutils.SucceedsSoon(t, func() error {
6579-
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_messages`); c != 1 {
6582+
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_row_messages`); c != 1 {
65806583
return errors.Errorf(`expected 1 got %d`, c)
65816584
}
6585+
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_resolved_messages`); c <= 2 {
6586+
return errors.Errorf(`expected > 2 got %d`, c)
6587+
}
65826588
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_bytes`); c != 22 {
65836589
return errors.Errorf(`expected 22 got %d`, c)
65846590
}
@@ -6622,14 +6628,17 @@ func TestChangefeedMonitoring(t *testing.T) {
66226628
// Check that two changefeeds add correctly.
66236629
// Set cluster settings back so we don't interfere with schema changes.
66246630
sysDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`)
6625-
fooCopy := feed(t, f, `CREATE CHANGEFEED FOR foo`)
6631+
fooCopy := feed(t, f, `CREATE CHANGEFEED FOR foo with resolved`)
66266632
_, _ = fooCopy.Next()
66276633
_, _ = fooCopy.Next()
66286634
testutils.SucceedsSoon(t, func() error {
66296635
// We can't assert exactly 4 or 88 in case we get (allowed) duplicates
66306636
// from RangeFeed.
6631-
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_messages`); c < 4 {
6632-
return errors.Errorf(`expected >= 4 got %d`, c)
6637+
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_row_messages`); c < 3 {
6638+
return errors.Errorf(`expected >= 3 got %d`, c)
6639+
}
6640+
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_resolved_messages`); c < 2 {
6641+
return errors.Errorf(`expected >= 2 got %d`, c)
66336642
}
66346643
if c := s.Server.MustGetSQLCounter(`changefeed.emitted_bytes`); c < 88 {
66356644
return errors.Errorf(`expected >= 88 got %d`, c)

pkg/ccl/changefeedccl/metrics.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ const defaultSLIScope = "default"
5858
// AggMetrics are aggregated metrics keeping track of aggregated changefeed performance
5959
// indicators, combined with a limited number of per-changefeed indicators.
6060
type AggMetrics struct {
61-
EmittedMessages *aggmetric.AggCounter
61+
EmittedRowMessages *aggmetric.AggCounter
62+
EmittedResolvedMessages *aggmetric.AggCounter
6263
EmittedBatchSizes *aggmetric.AggHistogram
6364
FilteredMessages *aggmetric.AggCounter
6465
MessageSize *aggmetric.AggHistogram
@@ -849,14 +850,27 @@ var (
849850
)
850851

851852
func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *AggMetrics {
852-
metaChangefeedEmittedMessages := metric.Metadata{
853-
Name: "changefeed.emitted_messages",
854-
Help: "Messages emitted by all feeds",
855-
Measurement: "Messages",
856-
Unit: metric.Unit_COUNT,
857-
Essential: true,
858-
Category: metric.Metadata_CHANGEFEEDS,
859-
HowToUse: `This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the rate of changes being streamed from the CockroachDB cluster.`,
853+
metaChangefeedEmittedRowMessages := metric.Metadata{
854+
Name: "changefeed.emitted_row_messages",
855+
Help: "Row messages emitted by all feeds",
856+
Measurement: "Messages",
857+
Unit: metric.Unit_COUNT,
858+
Essential: true,
859+
Category: metric.Metadata_CHANGEFEEDS,
860+
HowToUse: `This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the rate of row changes being streamed from the CockroachDB cluster.`,
861+
LabeledName: "changefeed.emitted_messages",
862+
StaticLabels: metric.MakeLabelPairs(metric.LabelName, "row"),
863+
}
864+
metaChangefeedEmittedResolvedMessages := metric.Metadata{
865+
Name: "changefeed.emitted_resolved_messages",
866+
Help: "Resolved timestamp messages emitted by all feeds",
867+
Measurement: "Messages",
868+
Unit: metric.Unit_COUNT,
869+
Essential: true,
870+
Category: metric.Metadata_CHANGEFEEDS,
871+
HowToUse: `This metric provides a useful context when assessing the state of changefeeds. This metric characterizes the rate of resolved timestamp messages being streamed from the CockroachDB cluster.`,
872+
LabeledName: "changefeed.emitted_messages",
873+
StaticLabels: metric.MakeLabelPairs(metric.LabelName, "resolved"),
860874
}
861875
metaChangefeedEmittedBatchSizes := metric.Metadata{
862876
Name: "changefeed.emitted_batch_sizes",
@@ -1115,10 +1129,10 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
11151129
// NB: When adding new histograms, use sigFigs = 1. Older histograms
11161130
// retain significant figures of 2.
11171131
b := aggmetric.MakeBuilder("scope")
1118-
emittedMessagesBuilder := aggmetric.MakeBuilder("scope", "message_type")
11191132
a := &AggMetrics{
1120-
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
1121-
EmittedMessages: emittedMessagesBuilder.Counter(metaChangefeedEmittedMessages),
1133+
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
1134+
EmittedRowMessages: b.Counter(metaChangefeedEmittedRowMessages),
1135+
EmittedResolvedMessages: b.Counter(metaChangefeedEmittedResolvedMessages),
11221136
EmittedBatchSizes: b.Histogram(metric.HistogramOptions{
11231137
Metadata: metaChangefeedEmittedBatchSizes,
11241138
Duration: histogramWindow,
@@ -1255,8 +1269,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12551269
}
12561270

12571271
sm := &sliMetrics{
1258-
EmittedRowMessages: a.EmittedMessages.AddChild(scope, "row"),
1259-
EmittedResolvedMessages: a.EmittedMessages.AddChild(scope, "resolved"),
1272+
EmittedRowMessages: a.EmittedRowMessages.AddChild(scope),
1273+
EmittedResolvedMessages: a.EmittedResolvedMessages.AddChild(scope),
12601274
EmittedBatchSizes: a.EmittedBatchSizes.AddChild(scope),
12611275
FilteredMessages: a.FilteredMessages.AddChild(scope),
12621276
MessageSize: a.MessageSize.AddChild(scope),

pkg/ccl/changefeedccl/timers/timers.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package timers
77

88
import (
9+
"fmt"
910
"time"
1011

1112
"github.com/cockroachdb/cockroach/pkg/util/metric"
@@ -34,34 +35,52 @@ func (*Timers) MetricStruct() {}
3435
var _ metric.Struct = &Timers{}
3536

3637
func New(histogramWindow time.Duration) *Timers {
37-
histogramOptsFor := func(name, desc string) metric.HistogramOptions {
38+
const (
39+
stagePrefix = "changefeed.stage"
40+
latencySuffix = "latency"
41+
ptsSubCategory = "pts"
42+
)
43+
44+
histogramOptsFor := func(name, labeledName, labelName, desc string) metric.HistogramOptions {
3845
return metric.HistogramOptions{
3946
Metadata: metric.Metadata{
40-
Name: name,
41-
Help: desc,
42-
Unit: metric.Unit_NANOSECONDS,
43-
Measurement: "Latency",
47+
Name: name,
48+
Help: desc,
49+
Unit: metric.Unit_NANOSECONDS,
50+
Measurement: "Latency",
51+
LabeledName: labeledName,
52+
StaticLabels: metric.MakeLabelPairs(metric.LabelName, labelName),
4453
},
4554
Duration: histogramWindow,
4655
Buckets: prometheus.ExponentialBucketsRange(float64(1*time.Microsecond), float64(1*time.Hour), 60),
4756
Mode: metric.HistogramModePrometheus,
4857
}
4958
}
5059

60+
stageOpts := func(name, labelName, desc string) metric.HistogramOptions {
61+
labeledName := fmt.Sprintf("%s.%s", stagePrefix, latencySuffix)
62+
return histogramOptsFor(name, labeledName, labelName, desc)
63+
}
64+
65+
ptsStageOpts := func(name, labelName, desc string) metric.HistogramOptions {
66+
labeledName := fmt.Sprintf("%s.%s.%s", stagePrefix, ptsSubCategory, latencySuffix)
67+
return histogramOptsFor(name, labeledName, labelName, desc)
68+
}
69+
5170
b := aggmetric.MakeBuilder("scope")
5271
return &Timers{
53-
CheckpointJobProgress: b.Histogram(histogramOptsFor("changefeed.stage.checkpoint_job_progress.latency", "Latency of the changefeed stage: checkpointing job progress")),
54-
FrontierPersistence: b.Histogram(histogramOptsFor("changefeed.stage.frontier_persistence.latency", "Latency of the changefeed stage: persisting frontier to job info")),
55-
Encode: b.Histogram(histogramOptsFor("changefeed.stage.encode.latency", "Latency of the changefeed stage: encoding data")),
56-
EmitRow: b.Histogram(histogramOptsFor("changefeed.stage.emit_row.latency", "Latency of the changefeed stage: emitting row to sink")),
57-
DownstreamClientSend: b.Histogram(histogramOptsFor("changefeed.stage.downstream_client_send.latency", "Latency of the changefeed stage: flushing messages from the sink's client to its downstream. This includes sends that failed for most but not all sinks.")),
58-
KVFeedWaitForTableEvent: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_wait_for_table_event.latency", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")),
59-
KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")),
60-
RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")),
61-
RangefeedBufferCheckpoint: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_checkpoint.latency", "Latency of the changefeed stage: buffering rangefeed checkpoint events")),
62-
PTSManage: b.Histogram(histogramOptsFor("changefeed.stage.pts.manage.latency", "Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed")),
63-
PTSManageError: b.Histogram(histogramOptsFor("changefeed.stage.pts.manage_error.latency", "Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error")),
64-
PTSCreate: b.Histogram(histogramOptsFor("changefeed.stage.pts.create.latency", "Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation")),
72+
CheckpointJobProgress: b.Histogram(stageOpts("changefeed.stage.checkpoint_job_progress.latency", "checkpoint_job_progress", "Latency of the changefeed stage: checkpointing job progress")),
73+
FrontierPersistence: b.Histogram(stageOpts("changefeed.stage.frontier_persistence.latency", "frontier_persistence", "Latency of the changefeed stage: persisting frontier to job info")),
74+
Encode: b.Histogram(stageOpts("changefeed.stage.encode.latency", "encode", "Latency of the changefeed stage: encoding data")),
75+
EmitRow: b.Histogram(stageOpts("changefeed.stage.emit_row.latency", "emit_row", "Latency of the changefeed stage: emitting row to sink")),
76+
DownstreamClientSend: b.Histogram(stageOpts("changefeed.stage.downstream_client_send.latency", "downstream_client_send", "Latency of the changefeed stage: flushing messages from the sink's client to its downstream. This includes sends that failed for most but not all sinks.")),
77+
KVFeedWaitForTableEvent: b.Histogram(stageOpts("changefeed.stage.kv_feed_wait_for_table_event.latency", "kv_feed_wait_for_table_event", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")),
78+
KVFeedBuffer: b.Histogram(stageOpts("changefeed.stage.kv_feed_buffer.latency", "kv_feed_buffer", "Latency of the changefeed stage: waiting to buffer kv events")),
79+
RangefeedBufferValue: b.Histogram(stageOpts("changefeed.stage.rangefeed_buffer_value.latency", "rangefeed_buffer_value", "Latency of the changefeed stage: buffering rangefeed value events")),
80+
RangefeedBufferCheckpoint: b.Histogram(stageOpts("changefeed.stage.rangefeed_buffer_checkpoint.latency", "rangefeed_buffer_checkpoint", "Latency of the changefeed stage: buffering rangefeed checkpoint events")),
81+
PTSManage: b.Histogram(ptsStageOpts("changefeed.stage.pts.manage.latency", "manage", "Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed")),
82+
PTSManageError: b.Histogram(ptsStageOpts("changefeed.stage.pts.manage_error.latency", "manage_error", "Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error")),
83+
PTSCreate: b.Histogram(ptsStageOpts("changefeed.stage.pts.create.latency", "create", "Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation")),
6584
}
6685
}
6786

0 commit comments

Comments
 (0)