Skip to content

Commit 9df479c

Browse files
committed
Mirror upstream elastic#137597 as single snapshot commit for AI review
BASE=afd3a426eabdfda7d4fd6b0c52d76162e3c9c47e HEAD=26abb9d1597bc46b560996f1854ea01e858f061f Branch=main
1 parent afd3a42 commit 9df479c

File tree

5 files changed

+415
-30
lines changed

5 files changed

+415
-30
lines changed

modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java

Lines changed: 125 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,58 @@
99

1010
package org.elasticsearch.index.reindex;
1111

12+
import org.elasticsearch.ElasticsearchStatusException;
13+
import org.elasticsearch.common.bytes.BytesArray;
14+
import org.elasticsearch.common.settings.Settings;
1215
import org.elasticsearch.index.query.QueryBuilders;
1316
import org.elasticsearch.plugins.Plugin;
1417
import org.elasticsearch.plugins.PluginsService;
1518
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
1619
import org.elasticsearch.reindex.ReindexPlugin;
20+
import org.elasticsearch.reindex.TransportReindexAction;
21+
import org.elasticsearch.rest.root.MainRestPlugin;
1722
import org.elasticsearch.search.sort.SortOrder;
1823
import org.elasticsearch.telemetry.Measurement;
1924
import org.elasticsearch.telemetry.TestTelemetryPlugin;
2025
import org.elasticsearch.test.ESIntegTestCase;
2126

27+
import java.net.InetSocketAddress;
2228
import java.util.Arrays;
2329
import java.util.Collection;
2430
import java.util.List;
31+
import java.util.Map;
2532

2633
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
2734
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
35+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE;
36+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE;
37+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL;
38+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE;
39+
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM;
2840
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
2941
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
3042
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
3143
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3245

3346
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
3447
public class ReindexPluginMetricsIT extends ESIntegTestCase {
3548
@Override
3649
protected Collection<Class<? extends Plugin>> nodePlugins() {
37-
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class);
50+
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class);
51+
}
52+
53+
@Override
54+
protected boolean addMockHttpTransport() {
55+
return false;
56+
}
57+
58+
@Override
59+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
60+
return Settings.builder()
61+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
62+
.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "*:*")
63+
.build();
3864
}
3965

4066
protected ReindexRequestBuilder reindex() {
@@ -53,6 +79,62 @@ public static BulkIndexByScrollResponseMatcher matcher() {
5379
return new BulkIndexByScrollResponseMatcher();
5480
}
5581

82+
public void testReindexFromRemoteMetrics() throws Exception {
83+
final String dataNodeName = internalCluster().startNode();
84+
85+
InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses());
86+
RemoteInfo remote = new RemoteInfo(
87+
"http",
88+
remoteAddress.getHostString(),
89+
remoteAddress.getPort(),
90+
null,
91+
new BytesArray("{\"match_all\":{}}"),
92+
null,
93+
null,
94+
Map.of(),
95+
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
96+
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
97+
);
98+
99+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
100+
.filterPlugins(TestTelemetryPlugin.class)
101+
.findFirst()
102+
.orElseThrow();
103+
104+
var expectedException = assertThrows(
105+
"Source index not created yet, should throw not found exception",
106+
ElasticsearchStatusException.class,
107+
() -> reindex().source("source").setRemoteInfo(remote).destination("dest").get()
108+
);
109+
110+
// assert failure metrics
111+
assertBusy(() -> {
112+
testTelemetryPlugin.collect();
113+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
114+
List<Measurement> completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM);
115+
assertThat(completions.size(), equalTo(1));
116+
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name()));
117+
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
118+
});
119+
120+
// now create the source index
121+
indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a"));
122+
assertHitCount(prepareSearch("source").setSize(0), 1);
123+
124+
reindex().source("source").setRemoteInfo(remote).destination("dest").get();
125+
126+
// assert success metrics
127+
assertBusy(() -> {
128+
testTelemetryPlugin.collect();
129+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
130+
List<Measurement> completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM);
131+
assertThat(completions.size(), equalTo(2));
132+
assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
133+
assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
134+
});
135+
136+
}
137+
56138
public void testReindexMetrics() throws Exception {
57139
final String dataNodeName = internalCluster().startNode();
58140

@@ -75,33 +157,67 @@ public void testReindexMetrics() throws Exception {
75157
// Use assertBusy to wait for all threads to complete so we get deterministic results
76158
assertBusy(() -> {
77159
testTelemetryPlugin.collect();
78-
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
79-
assertThat(measurements.size(), equalTo(1));
160+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
161+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1));
80162
});
81163

82164
// Now none of them
83165
createIndex("none");
84166
reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get();
85167
assertBusy(() -> {
86168
testTelemetryPlugin.collect();
87-
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
88-
assertThat(measurements.size(), equalTo(2));
169+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
170+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(2));
89171
});
90172

91173
// Now half of them
92174
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
93175
assertBusy(() -> {
94176
testTelemetryPlugin.collect();
95-
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
96-
assertThat(measurements.size(), equalTo(3));
177+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3));
178+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(3));
97179
});
98180

99181
// Limit with maxDocs
100182
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
101183
assertBusy(() -> {
102184
testTelemetryPlugin.collect();
103-
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
104-
assertThat(measurements.size(), equalTo(4));
185+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4));
186+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(4));
187+
188+
// asset all metric attributes are correct
189+
testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).forEach(m -> {
190+
assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
191+
assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL));
192+
});
193+
});
194+
}
195+
196+
public void testReindexMetricsWithBulkFailure() throws Exception {
197+
final String dataNodeName = internalCluster().startNode();
198+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
199+
.filterPlugins(TestTelemetryPlugin.class)
200+
.findFirst()
201+
.orElseThrow();
202+
203+
// source and destination have conflicting mappings to cause bulk failures
204+
indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words"));
205+
indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10));
206+
207+
var response = reindex().source("source").destination("dest").get();
208+
assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1));
209+
210+
assertBusy(() -> {
211+
testTelemetryPlugin.collect();
212+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
213+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1));
214+
assertThat(
215+
testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM)
216+
.getFirst()
217+
.attributes()
218+
.get(ATTRIBUTE_NAME_ERROR_TYPE),
219+
equalTo("org.elasticsearch.index.mapper.DocumentParsingException")
220+
);
105221
});
106222
}
107223

modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,74 @@
99

1010
package org.elasticsearch.reindex;
1111

12+
import org.elasticsearch.ElasticsearchStatusException;
1213
import org.elasticsearch.telemetry.metric.LongHistogram;
1314
import org.elasticsearch.telemetry.metric.MeterRegistry;
1415

16+
import java.util.HashMap;
17+
import java.util.Map;
18+
1519
public class ReindexMetrics {
1620

1721
public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
22+
public static final String REINDEX_COMPLETION_HISTOGRAM = "es.reindex.completion.histogram";
23+
24+
// refers to https://opentelemetry.io/docs/specs/semconv/registry/attributes/error/#error-type
25+
public static final String ATTRIBUTE_NAME_ERROR_TYPE = "error.type";
26+
27+
public static final String ATTRIBUTE_NAME_SOURCE = "reindex.source";
28+
public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local";
29+
public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote";
1830

1931
private final LongHistogram reindexTimeSecsHistogram;
32+
private final LongHistogram reindexCompletionHistogram;
2033

2134
public ReindexMetrics(MeterRegistry meterRegistry) {
22-
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
35+
this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds");
36+
this.reindexCompletionHistogram = meterRegistry.registerLongHistogram(
37+
REINDEX_COMPLETION_HISTOGRAM,
38+
"Number of completed reindex operations",
39+
"unit"
40+
);
2341
}
2442

25-
private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
26-
this.reindexTimeSecsHistogram = reindexTimeSecsHistogram;
27-
}
43+
public long recordTookTime(long tookTime, boolean remote) {
44+
Map<String, Object> attributes = getAttributes(remote);
2845

29-
public long recordTookTime(long tookTime) {
30-
reindexTimeSecsHistogram.record(tookTime);
46+
reindexTimeSecsHistogram.record(tookTime, attributes);
3147
return tookTime;
3248
}
49+
50+
public void recordSuccess(boolean remote) {
51+
Map<String, Object> attributes = getAttributes(remote);
52+
// attribute ATTRIBUTE_ERROR_TYPE being absent indicates success
53+
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes";
54+
55+
reindexCompletionHistogram.record(1L, attributes);
56+
}
57+
58+
public void recordFailure(boolean remote, Throwable e) {
59+
Map<String, Object> attributes = getAttributes(remote);
60+
// best effort to extract useful error type if possible
61+
String errorType;
62+
if (e instanceof ElasticsearchStatusException ese) {
63+
errorType = ese.status().name();
64+
} else {
65+
errorType = e.getClass().getTypeName();
66+
}
67+
attributes.put(ATTRIBUTE_NAME_ERROR_TYPE, errorType);
68+
69+
// attribute ATTRIBUTE_ERROR_TYPE being present indicates failure
70+
// https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#recording-errors-on-metrics
71+
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) != null : "error.type attribute must be present for failures";
72+
73+
reindexCompletionHistogram.record(1L, attributes);
74+
}
75+
76+
private Map<String, Object> getAttributes(boolean remote) {
77+
Map<String, Object> attributes = new HashMap<>();
78+
attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL);
79+
80+
return attributes;
81+
}
3382
}

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.ArrayList;
7070
import java.util.List;
7171
import java.util.Map;
72+
import java.util.Optional;
7273
import java.util.concurrent.TimeUnit;
7374
import java.util.concurrent.atomic.AtomicInteger;
7475
import java.util.function.BiFunction;
@@ -135,18 +136,63 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
135136
projectResolver.getProjectState(clusterService.state()),
136137
reindexSslConfig,
137138
request,
138-
ActionListener.runAfter(listener, () -> {
139-
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
140-
if (reindexMetrics != null) {
141-
reindexMetrics.recordTookTime(elapsedTime);
142-
}
143-
})
139+
wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null)
144140
);
145141
searchAction.start();
146142
}
147143
);
148144
}
149145

146+
// Visible for testing
147+
static ActionListener<BulkByScrollResponse> wrapWithMetrics(
148+
ActionListener<BulkByScrollResponse> listener,
149+
@Nullable ReindexMetrics metrics,
150+
long startTime,
151+
boolean isRemote
152+
) {
153+
if (metrics == null) {
154+
return listener;
155+
}
156+
157+
// add completion metrics
158+
var withCompletionMetrics = new ActionListener<BulkByScrollResponse>() {
159+
@Override
160+
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
161+
var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures())
162+
.stream()
163+
.flatMap(List::stream)
164+
.map(ScrollableHitSource.SearchFailure::getReason)
165+
.findFirst();
166+
var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures())
167+
.stream()
168+
.flatMap(List::stream)
169+
.map(BulkItemResponse.Failure::getCause)
170+
.findFirst();
171+
if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) {
172+
// record only the first sample error in metric
173+
Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get);
174+
metrics.recordFailure(isRemote, e);
175+
listener.onResponse(bulkByScrollResponse);
176+
} else {
177+
metrics.recordSuccess(isRemote);
178+
listener.onResponse(bulkByScrollResponse);
179+
}
180+
}
181+
182+
@Override
183+
public void onFailure(Exception e) {
184+
metrics.recordFailure(isRemote, e);
185+
listener.onFailure(e);
186+
}
187+
};
188+
189+
// add duration metric
190+
return ActionListener.runAfter(withCompletionMetrics, () -> {
191+
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
192+
metrics.recordTookTime(elapsedTime, isRemote);
193+
});
194+
}
195+
150196
/**
151197
* Build the {@link RestClient} used for reindexing from remote clusters.
152198
* @param remoteInfo connection information for the remote cluster

0 commit comments

Comments
 (0)