Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,58 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexPluginMetricsIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class);
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class);
}

@Override
protected boolean addMockHttpTransport() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "*:*")
.build();
}

protected ReindexRequestBuilder reindex() {
Expand All @@ -53,6 +79,62 @@ public static BulkIndexByScrollResponseMatcher matcher() {
return new BulkIndexByScrollResponseMatcher();
}

public void testReindexFromRemoteMetrics() throws Exception {
final String dataNodeName = internalCluster().startNode();

InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses());
RemoteInfo remote = new RemoteInfo(
"http",
remoteAddress.getHostString(),
remoteAddress.getPort(),
null,
new BytesArray("{\"match_all\":{}}"),
null,
null,
Map.of(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
);

final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

var expectedException = assertThrows(
"Source index not created yet, should throw not found exception",
ElasticsearchStatusException.class,
() -> reindex().source("source").setRemoteInfo(remote).destination("dest").get()
);

// assert failure metrics
assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
List<Measurement> completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM);
assertThat(completions.size(), equalTo(1));
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name()));
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
});

// now create the source index
indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a"));
assertHitCount(prepareSearch("source").setSize(0), 1);

reindex().source("source").setRemoteInfo(remote).destination("dest").get();

// assert success metrics
assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
List<Measurement> completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM);
assertThat(completions.size(), equalTo(2));
assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
});

}

public void testReindexMetrics() throws Exception {
final String dataNodeName = internalCluster().startNode();

Expand All @@ -75,33 +157,67 @@ public void testReindexMetrics() throws Exception {
// Use assertBusy to wait for all threads to complete so we get deterministic results
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1));
});

// Now none of them
createIndex("none");
reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(2));
});

// Now half of them
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(3));
});

// Limit with maxDocs
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(4));

// asset all metric attributes are correct
testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).forEach(m -> {
assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL));
});
});
}

public void testReindexMetricsWithBulkFailure() throws Exception {
final String dataNodeName = internalCluster().startNode();
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

// source and destination have conflicting mappings to cause bulk failures
indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words"));
indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10));

var response = reindex().source("source").destination("dest").get();
assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1));

assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1));
assertThat(
testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM)
.getFirst()
.attributes()
.get(ATTRIBUTE_NAME_ERROR_TYPE),
equalTo("org.elasticsearch.index.mapper.DocumentParsingException")
);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,74 @@

package org.elasticsearch.reindex;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.HashMap;
import java.util.Map;

public class ReindexMetrics {

public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
public static final String REINDEX_COMPLETION_HISTOGRAM = "es.reindex.completion.histogram";

// refers to https://opentelemetry.io/docs/specs/semconv/registry/attributes/error/#error-type
public static final String ATTRIBUTE_NAME_ERROR_TYPE = "error.type";

public static final String ATTRIBUTE_NAME_SOURCE = "reindex.source";
public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local";
public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote";

private final LongHistogram reindexTimeSecsHistogram;
private final LongHistogram reindexCompletionHistogram;

public ReindexMetrics(MeterRegistry meterRegistry) {
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds");
this.reindexCompletionHistogram = meterRegistry.registerLongHistogram(
REINDEX_COMPLETION_HISTOGRAM,
"Number of completed reindex operations",
"unit"
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a LongCounter? I don't think it makes sense to use a histogram when every value you're recording is 1L, does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of using Counter, my understanding is that Counter accumulates over time, which is good if we only want to track the the total amount at any given time. Whereas using histogram has the advantage of aggregating over arbitrary period of time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a counter accumulates over time. In my experience, you then use it by calculating a rate over some period of time e.g. 15mins. That's considered best practice when using Prometheus, for example.

I don't know whether the same thing works in Kibana. It relies on it being cheap and easy to compute rates. We should maybe ask around for advice — either within the team or with some subject expert.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the other option is to ditch this metric and just have a single histogram metric for the durations with the error attribute as well as the source attribute? That would mean combining the listeners, I haven't checked whether that's doable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find some existing Counter metrics in Kibana, and found the emitted values are not accumulating, which makes me wonder if my understanding on Counter is actually correct. I have put the details in this thread, hopefully someone can help answer there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @samxbr.

}

private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
this.reindexTimeSecsHistogram = reindexTimeSecsHistogram;
}
public long recordTookTime(long tookTime, boolean remote) {
Map<String, Object> attributes = getAttributes(remote);

public long recordTookTime(long tookTime) {
reindexTimeSecsHistogram.record(tookTime);
reindexTimeSecsHistogram.record(tookTime, attributes);
return tookTime;
}

public void recordSuccess(boolean remote) {
Map<String, Object> attributes = getAttributes(remote);
// attribute ATTRIBUTE_ERROR_TYPE being absent indicates success
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes";

reindexCompletionHistogram.record(1L, attributes);
}

public void recordFailure(boolean remote, Throwable e) {
Map<String, Object> attributes = getAttributes(remote);
// best effort to extract useful error type if possible
String errorType;
if (e instanceof ElasticsearchStatusException ese) {
errorType = ese.status().name();
} else {
errorType = e.getClass().getTypeName();
}
attributes.put(ATTRIBUTE_NAME_ERROR_TYPE, errorType);

// attribute ATTRIBUTE_ERROR_TYPE being present indicates failure
// https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#recording-errors-on-metrics
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) != null : "error.type attribute must be present for failures";

reindexCompletionHistogram.record(1L, attributes);
}

private Map<String, Object> getAttributes(boolean remote) {
Map<String, Object> attributes = new HashMap<>();
attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL);

return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -135,18 +136,63 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
ActionListener.runAfter(listener, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
if (reindexMetrics != null) {
reindexMetrics.recordTookTime(elapsedTime);
}
})
wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null)
);
searchAction.start();
}
);
}

// Visible for testing
static ActionListener<BulkByScrollResponse> wrapWithMetrics(
ActionListener<BulkByScrollResponse> listener,
@Nullable ReindexMetrics metrics,
long startTime,
boolean isRemote
) {
if (metrics == null) {
return listener;
}

// add completion metrics
var withCompletionMetrics = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures())
.stream()
.flatMap(List::stream)
.map(ScrollableHitSource.SearchFailure::getReason)
.findFirst();
var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures())
.stream()
.flatMap(List::stream)
.map(BulkItemResponse.Failure::getCause)
.findFirst();
if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) {
// record only the first sample error in metric
Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get);
metrics.recordFailure(isRemote, e);
listener.onResponse(bulkByScrollResponse);
} else {
metrics.recordSuccess(isRemote);
listener.onResponse(bulkByScrollResponse);
}
}

@Override
public void onFailure(Exception e) {
metrics.recordFailure(isRemote, e);
listener.onFailure(e);
}
};

// add duration metric
return ActionListener.runAfter(withCompletionMetrics, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
metrics.recordTookTime(elapsedTime, isRemote);
});
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
* @param remoteInfo connection information for the remote cluster
Expand Down
Loading