From f11a9c72a0646f0b4d6b97a552700534db3d928a Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Tue, 4 Nov 2025 18:46:47 -0500 Subject: [PATCH 1/3] Add reindex from remote metrics --- .../index/reindex/ReindexPluginMetricsIT.java | 63 +++++++++++++-- .../elasticsearch/reindex/ReindexMetrics.java | 57 +++++++++++-- .../org/elasticsearch/reindex/Reindexer.java | 43 ++++++++-- .../reindex/ReindexMetricsTests.java | 80 ++++++++++++++++--- .../elasticsearch/reindex/ReindexerTests.java | 66 +++++++++++++++ 5 files changed, 281 insertions(+), 28 deletions(-) create mode 100644 modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index d9580c13eada6..6e451486342b8 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -25,10 +25,16 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE; import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class ReindexPluginMetricsIT extends ESIntegTestCase { @@ -75,8 +81,12 @@ public void testReindexMetrics() throws Exception { // Use assertBusy to wait for all threads to complete so we get deterministic results assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); }); // Now none of them @@ -84,24 +94,61 @@ public void testReindexMetrics() throws Exception { reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get(); assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(2)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(2)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); }); // Now half of them reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get(); assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(3)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(3)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); }); // Limit with maxDocs reindex().source("source").destination("dest_size_one").maxDocs(1).get(); assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(4)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(4)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); + }); + } + + public void testReindexMetricsWithBulkFailure() throws Exception { + final String dataNodeName = internalCluster().startNode(); + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + // source and destination have conflicting mappings to cause bulk failures + indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words")); + indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10)); + + var response = reindex().source("source").destination("dest").get(); + assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1)); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); }); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index f7975120d9fc8..59ae2fe2f4446 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -15,19 +15,66 @@ public class ReindexMetrics { public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram"; + // metrics for remote reindex should be a subset of the all metrics + public static final String REINDEX_TIME_HISTOGRAM_REMOTE = "es.reindex.duration.histogram.remote"; + public static final String REINDEX_SUCCESS_HISTOGRAM = "es.reindex.completion.success"; + public static final String REINDEX_SUCCESS_HISTOGRAM_REMOTE = "es.reindex.completion.success.remote"; + public static final String REINDEX_FAILURE_HISTOGRAM = "es.reindex.completion.failure"; + public static final String REINDEX_FAILURE_HISTOGRAM_REMOTE = "es.reindex.completion.failure.remote"; private final LongHistogram reindexTimeSecsHistogram; + private final LongHistogram reindexTimeSecsHistogramRemote; + private final LongHistogram reindexSuccessHistogram; + private final LongHistogram reindexSuccessHistogramRemote; + private final LongHistogram reindexFailureHistogram; + private final LongHistogram reindexFailureHistogramRemote; public ReindexMetrics(MeterRegistry meterRegistry) { - this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds")); - } + this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"); + this.reindexTimeSecsHistogramRemote = meterRegistry.registerLongHistogram( + REINDEX_TIME_HISTOGRAM_REMOTE, + "Time to reindex by search from remote cluster", + "seconds" + ); + + this.reindexSuccessHistogram = meterRegistry.registerLongHistogram( + REINDEX_SUCCESS_HISTOGRAM, + "Number of successful reindex", + "unit" + ); + this.reindexSuccessHistogramRemote = meterRegistry.registerLongHistogram( + REINDEX_SUCCESS_HISTOGRAM_REMOTE, + "Number of successful reindex from remote cluster", + "unit" + ); - private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) { - this.reindexTimeSecsHistogram = reindexTimeSecsHistogram; + this.reindexFailureHistogram = meterRegistry.registerLongHistogram(REINDEX_FAILURE_HISTOGRAM, "Number of failed reindex", "unit"); + this.reindexFailureHistogramRemote = meterRegistry.registerLongHistogram( + REINDEX_FAILURE_HISTOGRAM_REMOTE, + "Number of failed reindex from remote cluster", + "unit" + ); } - public long recordTookTime(long tookTime) { + public long recordTookTime(long tookTime, boolean remote) { reindexTimeSecsHistogram.record(tookTime); + if (remote) { + reindexTimeSecsHistogramRemote.record(tookTime); + } return tookTime; } + + public void recordSuccess(boolean remote) { + reindexSuccessHistogram.record(1L); + if (remote) { + reindexSuccessHistogramRemote.record(1L); + } + } + + public void recordFailure(boolean remote) { + reindexFailureHistogram.record(1L); + if (remote) { + reindexFailureHistogramRemote.record(1L); + } + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index c073e7c1a83ad..03f4f2b611385 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -135,18 +135,49 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl projectResolver.getProjectState(clusterService.state()), reindexSslConfig, request, - ActionListener.runAfter(listener, () -> { - long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); - if (reindexMetrics != null) { - reindexMetrics.recordTookTime(elapsedTime); - } - }) + wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null) ); searchAction.start(); } ); } + // Visible for testing + static ActionListener wrapWithMetrics( + ActionListener listener, + @Nullable ReindexMetrics metrics, + long startTime, + boolean isRemote + ) { + if (metrics == null) { + return listener; + } + var withCompletionMetrics = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + if ((bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().isEmpty() == false) + || (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) { + metrics.recordFailure(isRemote); + listener.onResponse(bulkByScrollResponse); + } else { + metrics.recordSuccess(isRemote); + listener.onResponse(bulkByScrollResponse); + } + } + + @Override + public void onFailure(Exception e) { + metrics.recordFailure(isRemote); + listener.onFailure(e); + } + }; + + return ActionListener.runAfter(withCompletionMetrics, () -> { + long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); + metrics.recordTookTime(elapsedTime, isRemote); + }); + } + /** * Build the {@link RestClient} used for reindexing from remote clusters. * @param remoteInfo connection information for the remote cluster diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 1356dc50dadb0..aa56327ef5328 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -17,25 +17,87 @@ import java.util.List; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE; public class ReindexMetricsTests extends ESTestCase { - private RecordingMeterRegistry recordingMeterRegistry; + private RecordingMeterRegistry registry; private ReindexMetrics metrics; @Before public void createMetrics() { - recordingMeterRegistry = new RecordingMeterRegistry(); - metrics = new ReindexMetrics(recordingMeterRegistry); + registry = new RecordingMeterRegistry(); + metrics = new ReindexMetrics(registry); } public void testRecordTookTime() { - int secondsTaken = randomIntBetween(1, 50); - metrics.recordTookTime(secondsTaken); - List measurements = recordingMeterRegistry.getRecorder() - .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); - assertEquals(measurements.size(), 1); - assertEquals(measurements.get(0).getLong(), secondsTaken); + long secondsTaken = randomLongBetween(1, Long.MAX_VALUE); + + // first metric + metrics.recordTookTime(secondsTaken, false); + + List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); + assertEquals(1, measurements.size()); + assertEquals(secondsTaken, measurements.getFirst().getLong()); + + // second metric + long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); + metrics.recordTookTime(remoteSecondsTaken, true); + + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); + assertEquals(2, measurements.size()); + assertEquals(secondsTaken, measurements.getFirst().getLong()); + assertEquals(remoteSecondsTaken, measurements.getLast().getLong()); + List measurementsRemote = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM_REMOTE); + assertEquals(1, measurementsRemote.size()); + assertEquals(remoteSecondsTaken, measurementsRemote.getFirst().getLong()); + } + + public void testRecordSuccess() { + // first metric + metrics.recordSuccess(false); + + List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM); + assertEquals(1, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + + // second metric + metrics.recordSuccess(true); + + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM); + assertEquals(2, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + assertEquals(1, measurements.getLast().getLong()); + List measurementsRemote = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM_REMOTE); + assertEquals(1, measurementsRemote.size()); + assertEquals(1, measurements.getFirst().getLong()); + } + + public void testRecordFailure() { + // first metric + metrics.recordFailure(false); + + List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM); + assertEquals(1, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + + // second metric + metrics.recordFailure(true); + + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM); + assertEquals(2, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + assertEquals(1, measurements.getLast().getLong()); + List measurementsRemote = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM_REMOTE); + assertEquals(1, measurementsRemote.size()); + assertEquals(1, measurements.getFirst().getLong()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java new file mode 100644 index 0000000000000..ceee6f0940314 --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public class ReindexerTests extends ESTestCase { + + public void testWrapWithMetricsSuccess() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + + BulkByScrollResponse response = dummyResponse(); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics).recordSuccess(true); + verify(metrics, never()).recordFailure(anyBoolean()); + verify(metrics).recordTookTime(anyLong(), eq(true)); + } + + public void testWrapWithMetricsFailure() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + + Exception exception = new Exception("random failure"); + wrapped.onFailure(exception); + + verify(listener).onFailure(exception); + verify(metrics, never()).recordSuccess(anyBoolean()); + verify(metrics).recordFailure(true); + verify(metrics).recordTookTime(anyLong(), eq(true)); + } + + private BulkByScrollResponse dummyResponse() { + return new BulkByScrollResponse( + TimeValue.ZERO, + new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f, null, timeValueMillis(0)), + null, + null, + false + ); + } +} From 978f34c04f78c6b52bb35834f008df8e8a5afe56 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 5 Nov 2025 20:30:19 -0500 Subject: [PATCH 2/3] Use attributes and add more tests --- .../index/reindex/ReindexPluginMetricsIT.java | 131 +++++++++++++----- .../elasticsearch/reindex/ReindexMetrics.java | 90 ++++++------ .../org/elasticsearch/reindex/Reindexer.java | 23 ++- .../reindex/ReindexMetricsTests.java | 54 ++++---- .../elasticsearch/reindex/ReindexerTests.java | 58 +++++++- 5 files changed, 244 insertions(+), 112 deletions(-) diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index 6e451486342b8..e8b2a4f0aad7f 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -9,28 +9,35 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.reindex.TransportReindexAction; +import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE; import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -40,7 +47,20 @@ public class ReindexPluginMetricsIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class); + return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class); + } + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "*:*") + .build(); } protected ReindexRequestBuilder reindex() { @@ -59,6 +79,62 @@ public static BulkIndexByScrollResponseMatcher matcher() { return new BulkIndexByScrollResponseMatcher(); } + public void testReindexFromRemoteMetrics() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses()); + RemoteInfo remote = new RemoteInfo( + "http", + remoteAddress.getHostString(), + remoteAddress.getPort(), + null, + new BytesArray("{\"match_all\":{}}"), + null, + null, + Map.of(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, + RemoteInfo.DEFAULT_CONNECT_TIMEOUT + ); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + var expectedException = assertThrows( + "Source index not created yet, should throw not found exception", + ElasticsearchStatusException.class, + () -> reindex().source("source").setRemoteInfo(remote).destination("dest").get() + ); + + // assert failure metrics + assertBusy(() -> { + testTelemetryPlugin.collect(); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); + List completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM); + assertThat(completions.size(), equalTo(1)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name())); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + }); + + // now create the source index + indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a")); + assertHitCount(prepareSearch("source").setSize(0), 1); + + reindex().source("source").setRemoteInfo(remote).destination("dest").get(); + + // assert success metrics + assertBusy(() -> { + testTelemetryPlugin.collect(); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2)); + List completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM); + assertThat(completions.size(), equalTo(2)); + assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + }); + + } + public void testReindexMetrics() throws Exception { final String dataNodeName = internalCluster().startNode(); @@ -82,11 +158,7 @@ public void testReindexMetrics() throws Exception { assertBusy(() -> { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(1)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1)); }); // Now none of them @@ -95,11 +167,7 @@ public void testReindexMetrics() throws Exception { assertBusy(() -> { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(2)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(2)); }); // Now half of them @@ -107,11 +175,7 @@ public void testReindexMetrics() throws Exception { assertBusy(() -> { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(3)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(3)); }); // Limit with maxDocs @@ -119,11 +183,13 @@ public void testReindexMetrics() throws Exception { assertBusy(() -> { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(4)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(4)); + + // asset all metric attributes are correct + testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).forEach(m -> { + assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + }); }); } @@ -144,11 +210,14 @@ public void testReindexMetricsWithBulkFailure() throws Exception { assertBusy(() -> { testTelemetryPlugin.collect(); assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(1)); - assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1)); + assertThat( + testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM) + .getFirst() + .attributes() + .get(ATTRIBUTE_NAME_ERROR_TYPE), + equalTo("org.elasticsearch.index.mapper.DocumentParsingException") + ); }); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index 59ae2fe2f4446..58a2bffb642ab 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -9,72 +9,74 @@ package org.elasticsearch.reindex; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.telemetry.metric.LongHistogram; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.HashMap; +import java.util.Map; + public class ReindexMetrics { public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram"; - // metrics for remote reindex should be a subset of the all metrics - public static final String REINDEX_TIME_HISTOGRAM_REMOTE = "es.reindex.duration.histogram.remote"; - public static final String REINDEX_SUCCESS_HISTOGRAM = "es.reindex.completion.success"; - public static final String REINDEX_SUCCESS_HISTOGRAM_REMOTE = "es.reindex.completion.success.remote"; - public static final String REINDEX_FAILURE_HISTOGRAM = "es.reindex.completion.failure"; - public static final String REINDEX_FAILURE_HISTOGRAM_REMOTE = "es.reindex.completion.failure.remote"; + public static final String REINDEX_COMPLETION_HISTOGRAM = "es.reindex.completion.histogram"; + + // refers to https://opentelemetry.io/docs/specs/semconv/registry/attributes/error/#error-type + public static final String ATTRIBUTE_NAME_ERROR_TYPE = "error.type"; + + public static final String ATTRIBUTE_NAME_SOURCE = "reindex.source"; + public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local"; + public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote"; private final LongHistogram reindexTimeSecsHistogram; - private final LongHistogram reindexTimeSecsHistogramRemote; - private final LongHistogram reindexSuccessHistogram; - private final LongHistogram reindexSuccessHistogramRemote; - private final LongHistogram reindexFailureHistogram; - private final LongHistogram reindexFailureHistogramRemote; + private final LongHistogram reindexCompletionHistogram; public ReindexMetrics(MeterRegistry meterRegistry) { this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"); - this.reindexTimeSecsHistogramRemote = meterRegistry.registerLongHistogram( - REINDEX_TIME_HISTOGRAM_REMOTE, - "Time to reindex by search from remote cluster", - "seconds" - ); - - this.reindexSuccessHistogram = meterRegistry.registerLongHistogram( - REINDEX_SUCCESS_HISTOGRAM, - "Number of successful reindex", - "unit" - ); - this.reindexSuccessHistogramRemote = meterRegistry.registerLongHistogram( - REINDEX_SUCCESS_HISTOGRAM_REMOTE, - "Number of successful reindex from remote cluster", - "unit" - ); - - this.reindexFailureHistogram = meterRegistry.registerLongHistogram(REINDEX_FAILURE_HISTOGRAM, "Number of failed reindex", "unit"); - this.reindexFailureHistogramRemote = meterRegistry.registerLongHistogram( - REINDEX_FAILURE_HISTOGRAM_REMOTE, - "Number of failed reindex from remote cluster", + this.reindexCompletionHistogram = meterRegistry.registerLongHistogram( + REINDEX_COMPLETION_HISTOGRAM, + "Number of completed reindex operations", "unit" ); } public long recordTookTime(long tookTime, boolean remote) { - reindexTimeSecsHistogram.record(tookTime); - if (remote) { - reindexTimeSecsHistogramRemote.record(tookTime); - } + Map attributes = getAttributes(remote); + + reindexTimeSecsHistogram.record(tookTime, attributes); return tookTime; } public void recordSuccess(boolean remote) { - reindexSuccessHistogram.record(1L); - if (remote) { - reindexSuccessHistogramRemote.record(1L); - } + Map attributes = getAttributes(remote); + // attribute ATTRIBUTE_ERROR_TYPE being absent indicates success + assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes"; + + reindexCompletionHistogram.record(1L, attributes); } - public void recordFailure(boolean remote) { - reindexFailureHistogram.record(1L); - if (remote) { - reindexFailureHistogramRemote.record(1L); + public void recordFailure(boolean remote, Throwable e) { + Map attributes = getAttributes(remote); + // best effort to extract useful error type if possible + String errorType; + if (e instanceof ElasticsearchStatusException ese) { + errorType = ese.status().name(); + } else { + errorType = e.getClass().getTypeName(); } + attributes.put(ATTRIBUTE_NAME_ERROR_TYPE, errorType); + + // attribute ATTRIBUTE_ERROR_TYPE being present indicates failure + // https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#recording-errors-on-metrics + assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) != null : "error.type attribute must be present for failures"; + + reindexCompletionHistogram.record(1L, attributes); + } + + private Map getAttributes(boolean remote) { + Map attributes = new HashMap<>(); + attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? "remote" : "local"); + + return attributes; } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 03f4f2b611385..e1a20f1e1eace 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -69,6 +69,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -152,12 +153,25 @@ static ActionListener wrapWithMetrics( if (metrics == null) { return listener; } + + // add completion metrics var withCompletionMetrics = new ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - if ((bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().isEmpty() == false) - || (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) { - metrics.recordFailure(isRemote); + var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures()) + .stream() + .flatMap(List::stream) + .map(ScrollableHitSource.SearchFailure::getReason) + .findFirst(); + var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures()) + .stream() + .flatMap(List::stream) + .map(BulkItemResponse.Failure::getCause) + .findFirst(); + if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { + // record only the first sample error in metric + Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); + metrics.recordFailure(isRemote, e); listener.onResponse(bulkByScrollResponse); } else { metrics.recordSuccess(isRemote); @@ -167,11 +181,12 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) { @Override public void onFailure(Exception e) { - metrics.recordFailure(isRemote); + metrics.recordFailure(isRemote, e); listener.onFailure(e); } }; + // add duration metric return ActionListener.runAfter(withCompletionMetrics, () -> { long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); metrics.recordTookTime(elapsedTime, isRemote); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index aa56327ef5328..c0327807b8c3b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.reindex; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; @@ -17,12 +19,12 @@ import java.util.List; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; -import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE; public class ReindexMetricsTests extends ESTestCase { @@ -44,6 +46,7 @@ public void testRecordTookTime() { List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); assertEquals(1, measurements.size()); assertEquals(secondsTaken, measurements.getFirst().getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); // second metric long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); @@ -52,52 +55,49 @@ public void testRecordTookTime() { measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); assertEquals(2, measurements.size()); assertEquals(secondsTaken, measurements.getFirst().getLong()); - assertEquals(remoteSecondsTaken, measurements.getLast().getLong()); - List measurementsRemote = registry.getRecorder() - .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM_REMOTE); - assertEquals(1, measurementsRemote.size()); - assertEquals(remoteSecondsTaken, measurementsRemote.getFirst().getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(remoteSecondsTaken, measurements.get(1).getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); } public void testRecordSuccess() { // first metric metrics.recordSuccess(false); - List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM); + List measurements = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertNull(measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); // second metric metrics.recordSuccess(true); - measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM); + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); assertEquals(2, measurements.size()); - assertEquals(1, measurements.getFirst().getLong()); - assertEquals(1, measurements.getLast().getLong()); - List measurementsRemote = registry.getRecorder() - .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM_REMOTE); - assertEquals(1, measurementsRemote.size()); - assertEquals(1, measurements.getFirst().getLong()); + assertEquals(1, measurements.get(1).getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertNull(measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } public void testRecordFailure() { // first metric - metrics.recordFailure(false); + metrics.recordFailure(false, new IllegalArgumentException("random failure")); - List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM); + List measurements = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); assertEquals(1, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); + assertEquals("java.lang.IllegalArgumentException", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); // second metric - metrics.recordFailure(true); + metrics.recordFailure(true, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); - measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM); + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); assertEquals(2, measurements.size()); assertEquals(1, measurements.getFirst().getLong()); - assertEquals(1, measurements.getLast().getLong()); - List measurementsRemote = registry.getRecorder() - .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM_REMOTE); - assertEquals(1, measurementsRemote.size()); - assertEquals(1, measurements.getFirst().getLong()); + assertEquals(1, measurements.get(1).getLong()); + assertEquals(RestStatus.BAD_REQUEST.name(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index ceee6f0940314..8e8bd85a52768 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -10,12 +10,17 @@ package org.elasticsearch.reindex; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.test.ESTestCase; +import java.util.List; + import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -31,12 +36,12 @@ public void testWrapWithMetricsSuccess() { ActionListener listener = spy(ActionListener.noop()); var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); - BulkByScrollResponse response = dummyResponse(); + BulkByScrollResponse response = reindexResponse(null, null); wrapped.onResponse(response); verify(listener).onResponse(response); verify(metrics).recordSuccess(true); - verify(metrics, never()).recordFailure(anyBoolean()); + verify(metrics, never()).recordFailure(anyBoolean(), any()); verify(metrics).recordTookTime(anyLong(), eq(true)); } @@ -50,16 +55,57 @@ public void testWrapWithMetricsFailure() { verify(listener).onFailure(exception); verify(metrics, never()).recordSuccess(anyBoolean()); - verify(metrics).recordFailure(true); + verify(metrics).recordFailure(true, exception); + verify(metrics).recordTookTime(anyLong(), eq(true)); + } + + public void testWrapWithMetricsBulkFailure() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), false); + + Exception exception = new Exception("random failure"); + Exception anotherException = new Exception("another failure"); + BulkByScrollResponse response = reindexResponse( + List.of(new BulkItemResponse.Failure("0", "0", exception), new BulkItemResponse.Failure("1", "1", anotherException)), + null + ); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics, never()).recordSuccess(anyBoolean()); + verify(metrics).recordFailure(false, exception); + verify(metrics).recordTookTime(anyLong(), eq(false)); + } + + public void testWrapWithMetricsSearchFailure() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + + Exception exception = new Exception("random failure"); + Exception anotherException = new Exception("another failure"); + BulkByScrollResponse response = reindexResponse( + null, + List.of(new ScrollableHitSource.SearchFailure(exception), new ScrollableHitSource.SearchFailure(anotherException)) + ); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics, never()).recordSuccess(anyBoolean()); + verify(metrics).recordFailure(true, exception); verify(metrics).recordTookTime(anyLong(), eq(true)); } - private BulkByScrollResponse dummyResponse() { + private BulkByScrollResponse reindexResponse( + List bulkFailures, + List searchFailures + ) { return new BulkByScrollResponse( TimeValue.ZERO, new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f, null, timeValueMillis(0)), - null, - null, + bulkFailures, + searchFailures, false ); } From 84afac99d2e7f861d8887d9b040eb6f83434631d Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Thu, 6 Nov 2025 12:40:03 -0500 Subject: [PATCH 3/3] fix constant --- .../src/main/java/org/elasticsearch/reindex/ReindexMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index 58a2bffb642ab..8b0ef3ed68ac8 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -75,7 +75,7 @@ public void recordFailure(boolean remote, Throwable e) { private Map getAttributes(boolean remote) { Map attributes = new HashMap<>(); - attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? "remote" : "local"); + attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL); return attributes; }