diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index d9580c13eada6..e8b2a4f0aad7f 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -9,32 +9,58 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.reindex.TransportReindexAction; +import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class ReindexPluginMetricsIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class); + return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class); + } + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "*:*") + .build(); } protected ReindexRequestBuilder reindex() { @@ -53,6 +79,62 @@ public static BulkIndexByScrollResponseMatcher matcher() { return new BulkIndexByScrollResponseMatcher(); } + public void testReindexFromRemoteMetrics() throws Exception { + final String dataNodeName = internalCluster().startNode(); + + InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses()); + RemoteInfo remote = new RemoteInfo( + "http", + remoteAddress.getHostString(), + remoteAddress.getPort(), + null, + new BytesArray("{\"match_all\":{}}"), + null, + null, + Map.of(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, + RemoteInfo.DEFAULT_CONNECT_TIMEOUT + ); + + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + var expectedException = assertThrows( + "Source index not created yet, should throw not found exception", + ElasticsearchStatusException.class, + () -> reindex().source("source").setRemoteInfo(remote).destination("dest").get() + ); + + // assert failure metrics + assertBusy(() -> { + testTelemetryPlugin.collect(); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); + List completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM); + assertThat(completions.size(), equalTo(1)); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name())); + assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + }); + + // now create the source index + indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a")); + assertHitCount(prepareSearch("source").setSize(0), 1); + + reindex().source("source").setRemoteInfo(remote).destination("dest").get(); + + // assert success metrics + assertBusy(() -> { + testTelemetryPlugin.collect(); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2)); + List completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM); + assertThat(completions.size(), equalTo(2)); + assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); + }); + + } + public void testReindexMetrics() throws Exception { final String dataNodeName = internalCluster().startNode(); @@ -75,8 +157,8 @@ public void testReindexMetrics() throws Exception { // Use assertBusy to wait for all threads to complete so we get deterministic results assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1)); }); // Now none of them @@ -84,24 +166,58 @@ public void testReindexMetrics() throws Exception { reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get(); assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(2)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(2)); }); // Now half of them reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get(); assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(3)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(3)); }); // Limit with maxDocs reindex().source("source").destination("dest_size_one").maxDocs(1).get(); assertBusy(() -> { testTelemetryPlugin.collect(); - List measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM); - assertThat(measurements.size(), equalTo(4)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(4)); + + // asset all metric attributes are correct + testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).forEach(m -> { + assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL)); + }); + }); + } + + public void testReindexMetricsWithBulkFailure() throws Exception { + final String dataNodeName = internalCluster().startNode(); + final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + + // source and destination have conflicting mappings to cause bulk failures + indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words")); + indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10)); + + var response = reindex().source("source").destination("dest").get(); + assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1)); + + assertBusy(() -> { + testTelemetryPlugin.collect(); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1)); + assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1)); + assertThat( + testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM) + .getFirst() + .attributes() + .get(ATTRIBUTE_NAME_ERROR_TYPE), + equalTo("org.elasticsearch.index.mapper.DocumentParsingException") + ); }); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java index f7975120d9fc8..8b0ef3ed68ac8 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java @@ -9,25 +9,74 @@ package org.elasticsearch.reindex; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.telemetry.metric.LongHistogram; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.HashMap; +import java.util.Map; + public class ReindexMetrics { public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram"; + public static final String REINDEX_COMPLETION_HISTOGRAM = "es.reindex.completion.histogram"; + + // refers to https://opentelemetry.io/docs/specs/semconv/registry/attributes/error/#error-type + public static final String ATTRIBUTE_NAME_ERROR_TYPE = "error.type"; + + public static final String ATTRIBUTE_NAME_SOURCE = "reindex.source"; + public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local"; + public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote"; private final LongHistogram reindexTimeSecsHistogram; + private final LongHistogram reindexCompletionHistogram; public ReindexMetrics(MeterRegistry meterRegistry) { - this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds")); + this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"); + this.reindexCompletionHistogram = meterRegistry.registerLongHistogram( + REINDEX_COMPLETION_HISTOGRAM, + "Number of completed reindex operations", + "unit" + ); } - private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) { - this.reindexTimeSecsHistogram = reindexTimeSecsHistogram; - } + public long recordTookTime(long tookTime, boolean remote) { + Map attributes = getAttributes(remote); - public long recordTookTime(long tookTime) { - reindexTimeSecsHistogram.record(tookTime); + reindexTimeSecsHistogram.record(tookTime, attributes); return tookTime; } + + public void recordSuccess(boolean remote) { + Map attributes = getAttributes(remote); + // attribute ATTRIBUTE_ERROR_TYPE being absent indicates success + assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes"; + + reindexCompletionHistogram.record(1L, attributes); + } + + public void recordFailure(boolean remote, Throwable e) { + Map attributes = getAttributes(remote); + // best effort to extract useful error type if possible + String errorType; + if (e instanceof ElasticsearchStatusException ese) { + errorType = ese.status().name(); + } else { + errorType = e.getClass().getTypeName(); + } + attributes.put(ATTRIBUTE_NAME_ERROR_TYPE, errorType); + + // attribute ATTRIBUTE_ERROR_TYPE being present indicates failure + // https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#recording-errors-on-metrics + assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) != null : "error.type attribute must be present for failures"; + + reindexCompletionHistogram.record(1L, attributes); + } + + private Map getAttributes(boolean remote) { + Map attributes = new HashMap<>(); + attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL); + + return attributes; + } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index c073e7c1a83ad..e1a20f1e1eace 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -69,6 +69,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -135,18 +136,63 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl projectResolver.getProjectState(clusterService.state()), reindexSslConfig, request, - ActionListener.runAfter(listener, () -> { - long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); - if (reindexMetrics != null) { - reindexMetrics.recordTookTime(elapsedTime); - } - }) + wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null) ); searchAction.start(); } ); } + // Visible for testing + static ActionListener wrapWithMetrics( + ActionListener listener, + @Nullable ReindexMetrics metrics, + long startTime, + boolean isRemote + ) { + if (metrics == null) { + return listener; + } + + // add completion metrics + var withCompletionMetrics = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures()) + .stream() + .flatMap(List::stream) + .map(ScrollableHitSource.SearchFailure::getReason) + .findFirst(); + var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures()) + .stream() + .flatMap(List::stream) + .map(BulkItemResponse.Failure::getCause) + .findFirst(); + if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) { + // record only the first sample error in metric + Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get); + metrics.recordFailure(isRemote, e); + listener.onResponse(bulkByScrollResponse); + } else { + metrics.recordSuccess(isRemote); + listener.onResponse(bulkByScrollResponse); + } + } + + @Override + public void onFailure(Exception e) { + metrics.recordFailure(isRemote, e); + listener.onFailure(e); + } + }; + + // add duration metric + return ActionListener.runAfter(withCompletionMetrics, () -> { + long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime); + metrics.recordTookTime(elapsedTime, isRemote); + }); + } + /** * Build the {@link RestClient} used for reindexing from remote clusters. * @param remoteInfo connection information for the remote cluster diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java index 1356dc50dadb0..c0327807b8c3b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexMetricsTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.reindex; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.RecordingMeterRegistry; @@ -17,25 +19,85 @@ import java.util.List; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL; +import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE; +import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM; import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM; public class ReindexMetricsTests extends ESTestCase { - private RecordingMeterRegistry recordingMeterRegistry; + private RecordingMeterRegistry registry; private ReindexMetrics metrics; @Before public void createMetrics() { - recordingMeterRegistry = new RecordingMeterRegistry(); - metrics = new ReindexMetrics(recordingMeterRegistry); + registry = new RecordingMeterRegistry(); + metrics = new ReindexMetrics(registry); } public void testRecordTookTime() { - int secondsTaken = randomIntBetween(1, 50); - metrics.recordTookTime(secondsTaken); - List measurements = recordingMeterRegistry.getRecorder() - .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); - assertEquals(measurements.size(), 1); - assertEquals(measurements.get(0).getLong(), secondsTaken); + long secondsTaken = randomLongBetween(1, Long.MAX_VALUE); + + // first metric + metrics.recordTookTime(secondsTaken, false); + + List measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); + assertEquals(1, measurements.size()); + assertEquals(secondsTaken, measurements.getFirst().getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + + // second metric + long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE); + metrics.recordTookTime(remoteSecondsTaken, true); + + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM); + assertEquals(2, measurements.size()); + assertEquals(secondsTaken, measurements.getFirst().getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertEquals(remoteSecondsTaken, measurements.get(1).getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + } + + public void testRecordSuccess() { + // first metric + metrics.recordSuccess(false); + + List measurements = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); + assertEquals(1, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_LOCAL, measurements.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertNull(measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + + // second metric + metrics.recordSuccess(true); + + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); + assertEquals(2, measurements.size()); + assertEquals(1, measurements.get(1).getLong()); + assertEquals(ATTRIBUTE_VALUE_SOURCE_REMOTE, measurements.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE)); + assertNull(measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + } + + public void testRecordFailure() { + // first metric + metrics.recordFailure(false, new IllegalArgumentException("random failure")); + + List measurements = registry.getRecorder() + .getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); + assertEquals(1, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + assertEquals("java.lang.IllegalArgumentException", measurements.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); + + // second metric + metrics.recordFailure(true, new ElasticsearchStatusException("another failure", RestStatus.BAD_REQUEST)); + + measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_COMPLETION_HISTOGRAM); + assertEquals(2, measurements.size()); + assertEquals(1, measurements.getFirst().getLong()); + assertEquals(1, measurements.get(1).getLong()); + assertEquals(RestStatus.BAD_REQUEST.name(), measurements.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java new file mode 100644 index 0000000000000..8e8bd85a52768 --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; + +import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public class ReindexerTests extends ESTestCase { + + public void testWrapWithMetricsSuccess() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + + BulkByScrollResponse response = reindexResponse(null, null); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics).recordSuccess(true); + verify(metrics, never()).recordFailure(anyBoolean(), any()); + verify(metrics).recordTookTime(anyLong(), eq(true)); + } + + public void testWrapWithMetricsFailure() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + + Exception exception = new Exception("random failure"); + wrapped.onFailure(exception); + + verify(listener).onFailure(exception); + verify(metrics, never()).recordSuccess(anyBoolean()); + verify(metrics).recordFailure(true, exception); + verify(metrics).recordTookTime(anyLong(), eq(true)); + } + + public void testWrapWithMetricsBulkFailure() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), false); + + Exception exception = new Exception("random failure"); + Exception anotherException = new Exception("another failure"); + BulkByScrollResponse response = reindexResponse( + List.of(new BulkItemResponse.Failure("0", "0", exception), new BulkItemResponse.Failure("1", "1", anotherException)), + null + ); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics, never()).recordSuccess(anyBoolean()); + verify(metrics).recordFailure(false, exception); + verify(metrics).recordTookTime(anyLong(), eq(false)); + } + + public void testWrapWithMetricsSearchFailure() { + ReindexMetrics metrics = mock(); + ActionListener listener = spy(ActionListener.noop()); + var wrapped = Reindexer.wrapWithMetrics(listener, metrics, randomNonNegativeLong(), true); + + Exception exception = new Exception("random failure"); + Exception anotherException = new Exception("another failure"); + BulkByScrollResponse response = reindexResponse( + null, + List.of(new ScrollableHitSource.SearchFailure(exception), new ScrollableHitSource.SearchFailure(anotherException)) + ); + wrapped.onResponse(response); + + verify(listener).onResponse(response); + verify(metrics, never()).recordSuccess(anyBoolean()); + verify(metrics).recordFailure(true, exception); + verify(metrics).recordTookTime(anyLong(), eq(true)); + } + + private BulkByScrollResponse reindexResponse( + List bulkFailures, + List searchFailures + ) { + return new BulkByScrollResponse( + TimeValue.ZERO, + new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f, null, timeValueMillis(0)), + bulkFailures, + searchFailures, + false + ); + } +}